/** * try to access a block on a data node. If fails - throws exception * @param datanode * @param lblock * @throws IOException */ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) throws IOException { InetSocketAddress targetAddr = null; Socket s = null; ExtendedBlock block = lblock.getBlock(); targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); String file = BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); BlockReader blockReader = BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure", TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false); blockReader.close(); }
/** * try to access a block on a data node. If fails - throws exception * * @param datanode * @param lblock * @throws IOException */ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) throws IOException { InetSocketAddress targetAddr = null; Socket s = null; ExtendedBlock block = lblock.getBlock(); targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); String file = BlockReaderFactory .getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); BlockReaderFactory .newBlockReader(conf, s, file, block, lblock.getBlockToken(), 0, -1, null); // nothing - if it fails - it will throw and exception }
/** * try to access a block on a data node. If fails - throws exception * @param datanode * @param lblock * @throws IOException */ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) throws IOException { InetSocketAddress targetAddr = null; Socket s = null; ExtendedBlock block = lblock.getBlock(); targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); String file = BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); BlockReader blockReader = BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure", TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false, CachingStrategy.newDefaultStrategy()); blockReader.close(); }
@Test(timeout=60000) public void testDataXceiverCleansUpSlotsOnFailure() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testDataXceiverCleansUpSlotsOnFailure", sockDir); conf.setLong( HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final Path TEST_PATH1 = new Path("/test_file1"); final Path TEST_PATH2 = new Path("/test_file2"); final int TEST_FILE_LEN = 4096; final int SEED = 0xFADE1; DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN, (short)1, SEED); DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN, (short)1, SEED); // The first read should allocate one shared memory segment and slot. DFSTestUtil.readFileBuffer(fs, TEST_PATH1); // The second read should fail, and we should only have 1 segment and 1 slot // left. BlockReaderFactory.setFailureInjectorForTesting( new TestCleanupFailureInjector()); try { DFSTestUtil.readFileBuffer(fs, TEST_PATH2); } catch (Throwable t) { GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + "testing, but we failed to do a non-TCP read.", t); } checkNumberOfSegmentsAndSlots(1, 1, cluster.getDataNodes().get(0).getShortCircuitRegistry()); cluster.shutdown(); sockDir.close(); }
@Test(timeout=60000) public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testPreReceiptVerificationDfsClientCanDoScr", sockDir); conf.setLong( HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); BlockReaderFactory.setFailureInjectorForTesting( new TestPreReceiptVerificationFailureInjector()); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); final Path TEST_PATH2 = new Path("/test_file2"); DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); DFSTestUtil.readFileBuffer(fs, TEST_PATH1); DFSTestUtil.readFileBuffer(fs, TEST_PATH2); ShortCircuitRegistry registry = cluster.getDataNodes().get(0).getShortCircuitRegistry(); registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(2, slots.size()); } }); cluster.shutdown(); sockDir.close(); }
private static void tryRead(final Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; IOException ioe = null; BlockReader blockReader = null; ExtendedBlock block = lblock.getBlock(); try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block). setBlockToken(lblock.getBlockToken()). setInetSocketAddress(targetAddr). setStartOffset(0). setLength(-1). setVerifyChecksum(true). setClientName("TestBlockTokenWithDFS"). setDatanodeInfo(nodes[0]). setCachingStrategy(CachingStrategy.newDefaultStrategy()). setClientCacheContext(ClientContext.getFromConf(conf)). setConfiguration(conf). setRemotePeerFactory(new RemotePeerFactory() { @Override public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); peer = TcpPeerServer.peerFromSocket(sock); } finally { if (peer == null) { IOUtils.closeSocket(sock); } } return peer; } }). build(); } catch (IOException ex) { ioe = ex; } finally { if (blockReader != null) { try { blockReader.close(); } catch (IOException e) { throw new RuntimeException(e); } } } if (shouldSucceed) { Assert.assertNotNull("OP_READ_BLOCK: access token is invalid, " + "when it is expected to be valid", blockReader); } else { Assert.assertNotNull("OP_READ_BLOCK: access token is valid, " + "when it is expected to be invalid", ioe); Assert.assertTrue( "OP_READ_BLOCK failed due to reasons other than access token: ", ioe instanceof InvalidBlockTokenException); } }
/** * try to access a block on a data node. If fails - throws exception * @param datanode * @param lblock * @throws IOException */ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) throws IOException { InetSocketAddress targetAddr = null; ExtendedBlock block = lblock.getBlock(); targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). setInetSocketAddress(targetAddr). setBlock(block). setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlockToken(lblock.getBlockToken()). setStartOffset(0). setLength(-1). setVerifyChecksum(true). setClientName("TestDataNodeVolumeFailure"). setDatanodeInfo(datanode). setCachingStrategy(CachingStrategy.newDefaultStrategy()). setClientCacheContext(ClientContext.getFromConf(conf)). setConfiguration(conf). setRemotePeerFactory(new RemotePeerFactory() { @Override public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); peer = TcpPeerServer.peerFromSocket(sock); } finally { if (peer == null) { IOUtils.closeSocket(sock); } } return peer; } }). build(); blockReader.close(); }
protected void tryRead(final Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; IOException ioe = null; BlockReader blockReader = null; ExtendedBlock block = lblock.getBlock(); try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); blockReader = new BlockReaderFactory(new DfsClientConf(conf)). setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block). setBlockToken(lblock.getBlockToken()). setInetSocketAddress(targetAddr). setStartOffset(0). setLength(-1). setVerifyChecksum(true). setClientName("TestBlockTokenWithDFS"). setDatanodeInfo(nodes[0]). setCachingStrategy(CachingStrategy.newDefaultStrategy()). setClientCacheContext(ClientContext.getFromConf(conf)). setConfiguration(conf). setTracer(FsTracer.get(conf)). setRemotePeerFactory(new RemotePeerFactory() { @Override public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { sock.connect(addr, HdfsConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); peer = DFSUtilClient.peerFromSocket(sock); } finally { if (peer == null) { IOUtils.closeSocket(sock); } } return peer; } }). build(); } catch (IOException ex) { ioe = ex; } finally { if (blockReader != null) { try { blockReader.close(); } catch (IOException e) { throw new RuntimeException(e); } } } if (shouldSucceed) { Assert.assertNotNull("OP_READ_BLOCK: access token is invalid, " + "when it is expected to be valid", blockReader); } else { Assert.assertNotNull("OP_READ_BLOCK: access token is valid, " + "when it is expected to be invalid", ioe); Assert.assertTrue( "OP_READ_BLOCK failed due to reasons other than access token: ", ioe instanceof InvalidBlockTokenException); } }
/** * try to access a block on a data node. If fails - throws exception * @param datanode * @param lblock * @throws IOException */ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) throws IOException { InetSocketAddress targetAddr = null; ExtendedBlock block = lblock.getBlock(); targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)). setInetSocketAddress(targetAddr). setBlock(block). setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlockToken(lblock.getBlockToken()). setStartOffset(0). setLength(-1). setVerifyChecksum(true). setClientName("TestDataNodeVolumeFailure"). setDatanodeInfo(datanode). setCachingStrategy(CachingStrategy.newDefaultStrategy()). setClientCacheContext(ClientContext.getFromConf(conf)). setConfiguration(conf). setTracer(FsTracer.get(conf)). setRemotePeerFactory(new RemotePeerFactory() { @Override public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { sock.connect(addr, HdfsConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); peer = DFSUtilClient.peerFromSocket(sock); } finally { if (peer == null) { IOUtils.closeSocket(sock); } } return peer; } }). build(); blockReader.close(); }
public static void streamBlockInAscii(InetSocketAddress addr, String poolId, long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp, long blockSize, long offsetIntoBlock, long chunkSizeToView, JspWriter out, Configuration conf, DFSClient.Conf dfsConf, DataEncryptionKey encryptionKey) throws IOException { if (chunkSizeToView == 0) return; Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(addr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock); // Use the block name for file name. String file = BlockReaderFactory.getFileName(addr, poolId, blockId); BlockReader blockReader = BlockReaderFactory.newBlockReader(dfsConf, file, new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, offsetIntoBlock, amtToRead, true, "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey), new DatanodeID(addr.getAddress().getHostAddress(), addr.getHostName(), poolId, addr.getPort(), 0, 0), null, null, null, false); final byte[] buf = new byte[amtToRead]; int readOffset = 0; int retries = 2; while ( amtToRead > 0 ) { int numRead = amtToRead; try { blockReader.readFully(buf, readOffset, amtToRead); } catch (IOException e) { retries--; if (retries == 0) throw new IOException("Could not read data from datanode"); continue; } amtToRead -= numRead; readOffset += numRead; } blockReader.close(); out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8))); }
private static void tryRead(Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; Socket s = null; BlockReader blockReader = null; ExtendedBlock block = lblock.getBlock(); try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); String file = BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); blockReader = BlockReaderFactory.newBlockReader( new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1, true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s), nodes[0], null, null, null, false); } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException) { assertFalse("OP_READ_BLOCK: access token is invalid, " + "when it is expected to be valid", shouldSucceed); return; } fail("OP_READ_BLOCK failed due to reasons other than access token: " + StringUtils.stringifyException(ex)); } finally { if (s != null) { try { s.close(); } catch (IOException iex) { } finally { s = null; } } } if (blockReader == null) { fail("OP_READ_BLOCK failed due to reasons other than access token"); } assertTrue("OP_READ_BLOCK: access token is valid, " + "when it is expected to be invalid", shouldSucceed); }
public static void streamBlockInAscii(InetSocketAddress addr, String poolId, long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp, long blockSize, long offsetIntoBlock, long chunkSizeToView, JspWriter out, Configuration conf, DataEncryptionKey encryptionKey) throws IOException { if (chunkSizeToView == 0) { return; } Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(addr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); int amtToRead = (int) Math.min(chunkSizeToView, blockSize - offsetIntoBlock); // Use the block name for file name. String file = BlockReaderFactory.getFileName(addr, poolId, blockId); BlockReader blockReader = BlockReaderFactory.newBlockReader(conf, s, file, new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, offsetIntoBlock, amtToRead, encryptionKey); byte[] buf = new byte[(int) amtToRead]; int readOffset = 0; int retries = 2; while (amtToRead > 0) { int numRead = amtToRead; try { blockReader.readFully(buf, readOffset, amtToRead); } catch (IOException e) { retries--; if (retries == 0) { throw new IOException("Could not read data from datanode"); } continue; } amtToRead -= numRead; readOffset += numRead; } blockReader = null; s.close(); out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8))); }
private static void tryRead(Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; Socket s = null; BlockReader blockReader = null; ExtendedBlock block = lblock.getBlock(); try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); String file = BlockReaderFactory .getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); blockReader = BlockReaderFactory .newBlockReader(conf, s, file, block, lblock.getBlockToken(), 0, -1, null); } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException) { assertFalse("OP_READ_BLOCK: access token is invalid, " + "when it is expected to be valid", shouldSucceed); return; } fail("OP_READ_BLOCK failed due to reasons other than access token: " + StringUtils.stringifyException(ex)); } finally { if (s != null) { try { s.close(); } catch (IOException iex) { } finally { s = null; } } } if (blockReader == null) { fail("OP_READ_BLOCK failed due to reasons other than access token"); } assertTrue("OP_READ_BLOCK: access token is valid, " + "when it is expected to be invalid", shouldSucceed); }
public static void streamBlockInAscii(InetSocketAddress addr, String poolId, long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp, long blockSize, long offsetIntoBlock, long chunkSizeToView, JspWriter out, Configuration conf, DFSClient.Conf dfsConf, DataEncryptionKey encryptionKey) throws IOException { if (chunkSizeToView == 0) return; Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(addr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock); // Use the block name for file name. String file = BlockReaderFactory.getFileName(addr, poolId, blockId); BlockReader blockReader = BlockReaderFactory.newBlockReader(dfsConf, file, new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, offsetIntoBlock, amtToRead, true, "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey), new DatanodeID(addr.getAddress().getHostAddress(), addr.getHostName(), poolId, addr.getPort(), 0, 0, 0), null, null, null, false, CachingStrategy.newDefaultStrategy()); final byte[] buf = new byte[amtToRead]; int readOffset = 0; int retries = 2; while ( amtToRead > 0 ) { int numRead = amtToRead; try { blockReader.readFully(buf, readOffset, amtToRead); } catch (IOException e) { retries--; if (retries == 0) throw new IOException("Could not read data from datanode"); continue; } amtToRead -= numRead; readOffset += numRead; } blockReader.close(); out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8))); }
private static void tryRead(Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; Socket s = null; BlockReader blockReader = null; ExtendedBlock block = lblock.getBlock(); try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); String file = BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); blockReader = BlockReaderFactory.newBlockReader( new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1, true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s), nodes[0], null, null, null, false, CachingStrategy.newDefaultStrategy()); } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException) { assertFalse("OP_READ_BLOCK: access token is invalid, " + "when it is expected to be valid", shouldSucceed); return; } fail("OP_READ_BLOCK failed due to reasons other than access token: " + StringUtils.stringifyException(ex)); } finally { if (s != null) { try { s.close(); } catch (IOException iex) { } finally { s = null; } } } if (blockReader == null) { fail("OP_READ_BLOCK failed due to reasons other than access token"); } assertTrue("OP_READ_BLOCK: access token is valid, " + "when it is expected to be invalid", shouldSucceed); }
private static void tryRead(final Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; IOException ioe = null; BlockReader blockReader = null; ExtendedBlock block = lblock.getBlock(); try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block). setBlockToken(lblock.getBlockToken()). setInetSocketAddress(targetAddr). setStartOffset(0). setLength(-1). setVerifyChecksum(true). setClientName("TestBlockTokenWithDFS"). setDatanodeInfo(nodes[0]). setCachingStrategy(CachingStrategy.newDefaultStrategy()). setClientCacheContext(ClientContext.getFromConf(conf)). setConfiguration(conf). setRemotePeerFactory(new RemotePeerFactory() { @Override public Peer newConnectedPeer(InetSocketAddress addr) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); peer = TcpPeerServer.peerFromSocket(sock); } finally { if (peer == null) { IOUtils.closeSocket(sock); } } return peer; } }). build(); } catch (IOException ex) { ioe = ex; } finally { if (blockReader != null) { try { blockReader.close(); } catch (IOException e) { throw new RuntimeException(e); } } } if (shouldSucceed) { Assert.assertNotNull("OP_READ_BLOCK: access token is invalid, " + "when it is expected to be valid", blockReader); } else { Assert.assertNotNull("OP_READ_BLOCK: access token is valid, " + "when it is expected to be invalid", ioe); Assert.assertTrue( "OP_READ_BLOCK failed due to reasons other than access token: ", ioe instanceof InvalidBlockTokenException); } }
/** * try to access a block on a data node. If fails - throws exception * @param datanode * @param lblock * @throws IOException */ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) throws IOException { InetSocketAddress targetAddr = null; ExtendedBlock block = lblock.getBlock(); targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). setInetSocketAddress(targetAddr). setBlock(block). setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlockToken(lblock.getBlockToken()). setStartOffset(0). setLength(-1). setVerifyChecksum(true). setClientName("TestDataNodeVolumeFailure"). setDatanodeInfo(datanode). setCachingStrategy(CachingStrategy.newDefaultStrategy()). setClientCacheContext(ClientContext.getFromConf(conf)). setConfiguration(conf). setRemotePeerFactory(new RemotePeerFactory() { @Override public Peer newConnectedPeer(InetSocketAddress addr) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); peer = TcpPeerServer.peerFromSocket(sock); } finally { if (peer == null) { IOUtils.closeSocket(sock); } } return peer; } }). build(); blockReader.close(); }