/** * Should the block access token be refetched on an exception * * @param ex Exception received * @param targetAddr Target datanode address from where exception was received * @return true if block access token has expired or invalid and it should be * refetched */ private static boolean tokenRefetchNeeded(IOException ex, InetSocketAddress targetAddr) { /* * Get a new access token and retry. Retry is needed in 2 cases. 1) * When both NN and DN re-started while DFSClient holding a cached * access token. 2) In the case that NN fails to update its * access key at pre-set interval (by a wide margin) and * subsequently restarts. In this case, DN re-registers itself with * NN and receives a new access key, but DN will delete the old * access key from its memory since it's considered expired based on * the estimated expiration date. */ if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) { DFSClient.LOG.info("Access token was invalid when connecting to " + targetAddr + " : " + ex); return true; } return false; }
public static void checkBlockOpStatus( BlockOpResponseProto response, String logInfo) throws IOException { if (response.getStatus() != Status.SUCCESS) { if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( "Got access token error" + ", status message " + response.getMessage() + ", " + logInfo ); } else { throw new IOException( "Got error" + ", status message " + response.getMessage() + ", " + logInfo ); } } }
/** * Should the block access token be refetched on an exception * * @param ex Exception received * @param targetAddr Target datanode address from where exception was received * @return true if block access token has expired or invalid and it should be * refetched */ protected static boolean tokenRefetchNeeded(IOException ex, InetSocketAddress targetAddr) { /* * Get a new access token and retry. Retry is needed in 2 cases. 1) * When both NN and DN re-started while DFSClient holding a cached * access token. 2) In the case that NN fails to update its * access key at pre-set interval (by a wide margin) and * subsequently restarts. In this case, DN re-registers itself with * NN and receives a new access key, but DN will delete the old * access key from its memory since it's considered expired based on * the estimated expiration date. */ if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) { DFSClient.LOG.info("Access token was invalid when connecting to " + targetAddr + " : " + ex); return true; } return false; }
public static void checkBlockOpStatus( BlockOpResponseProto response, String logInfo) throws IOException { if (response.getStatus() != Status.SUCCESS) { if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( "Got access token error" + ", status message " + response.getMessage() + ", " + logInfo ); } else { throw new IOException( "Got error" + ", status=" + response.getStatus().name() + ", status message " + response.getMessage() + ", " + logInfo ); } } }
static void checkSuccess( BlockOpResponseProto status, Peer peer, ExtendedBlock block, String file) throws IOException { if (status.getStatus() != Status.SUCCESS) { if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( "Got access token error for OP_READ_BLOCK, self=" + peer.getLocalAddressString() + ", remote=" + peer.getRemoteAddressString() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp()); } else { throw new IOException("Got error for OP_READ_BLOCK, self=" + peer.getLocalAddressString() + ", remote=" + peer.getRemoteAddressString() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp()); } } }
/** * Should the block access token be refetched on an exception * * @param ex Exception received * @param targetAddr Target datanode address from where exception was received * @return true if block access token has expired or invalid and it should be * refetched */ private static boolean tokenRefetchNeeded(IOException ex, InetSocketAddress targetAddr) { /* * Get a new access token and retry. Retry is needed in 2 cases. 1) When * both NN and DN re-started while DFSClient holding a cached access token. * 2) In the case that NN fails to update its access key at pre-set interval * (by a wide margin) and subsequently restarts. In this case, DN * re-registers itself with NN and receives a new access key, but DN will * delete the old access key from its memory since it's considered expired * based on the estimated expiration date. */ if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) { LOG.info("Access token was invalid when connecting to " + targetAddr + " : " + ex); return true; } return false; }
static void checkSuccess(BlockOpResponseProto status, Socket sock, ExtendedBlock block, String file) throws IOException { if (status.getStatus() != Status.SUCCESS) { if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( "Got access token error for OP_READ_BLOCK, self=" + sock.getLocalSocketAddress() + ", remote=" + sock.getRemoteSocketAddress() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp()); } else { throw new IOException("Got error for OP_READ_BLOCK, self=" + sock.getLocalSocketAddress() + ", remote=" + sock.getRemoteSocketAddress() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp()); } } }
/** * Should the block access token be refetched on an exception * * @param ex * Exception received * @param targetAddr * Target datanode address from where exception was received * @return true if block access token has expired or invalid and it should be * refetched */ private static boolean tokenRefetchNeeded(IOException ex, InetSocketAddress targetAddr) { /* * Get a new access token and retry. Retry is needed in 2 cases. 1) When * both NN and DN re-started while DFSClient holding a cached access token. * 2) In the case that NN fails to update its access key at pre-set interval * (by a wide margin) and subsequently restarts. In this case, DN * re-registers itself with NN and receives a new access key, but DN will * delete the old access key from its memory since it's considered expired * based on the estimated expiration date. */ if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) { LOG.info( "Access token was invalid when connecting to " + targetAddr + " : " + ex); return true; } return false; }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb the located block * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (reply.getStatus() != Status.SUCCESS) { if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException(); } else { throw new IOException("Bad response " + reply + " trying to read " + lb.getBlock() + " from datanode " + dn); } } return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param in input stream from datanode * @param out output stream to datanode * @param lb the located block * @param clientName the name of the DFSClient requesting the checksum * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private static Type inferChecksumTypeByReading( String clientName, SocketFactory socketFactory, int socketTimeout, LocatedBlock lb, DatanodeInfo dn, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) throws IOException { IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, encryptionKey, dn, socketTimeout); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (reply.getStatus() != Status.SUCCESS) { if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException(); } else { throw new IOException("Bad response " + reply + " trying to read " + lb.getBlock() + " from datanode " + dn); } } return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb * the located block * @param clientName * the name of the DFSClient requesting the checksum * @param dn * the connected datanode * @return the inferred checksum type * @throws IOException * if an error occurs */ private static Type inferChecksumTypeByReading(String clientName, SocketFactory socketFactory, int socketTimeout, LocatedBlock lb, DatanodeInfo dn, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) throws IOException { IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, encryptionKey, dn, socketTimeout); try { DataOutputStream out = new DataOutputStream( new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out) .readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (reply.getStatus() != Status.SUCCESS) { if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException(); } else { throw new IOException( "Bad response " + reply + " trying to read " + lb.getBlock() + " from datanode " + dn); } } return PBHelper .convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param in input stream from datanode * @param out output stream to datanode * @param lb the located block * @param clientName the name of the DFSClient requesting the checksum * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private static Type inferChecksumTypeByReading( String clientName, SocketFactory socketFactory, int socketTimeout, LocatedBlock lb, DatanodeInfo dn, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) throws IOException { IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, encryptionKey, dn, socketTimeout); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (reply.getStatus() != Status.SUCCESS) { if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException(); } else { throw new IOException("Bad response " + reply + " trying to read " + lb.getBlock() + " from datanode " + dn); } } return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
/** * Get block location information about a list of {@link HdfsBlockLocation}. * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to * get {@link BlockStorageLocation}s for blocks returned by * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} * . * * This is done by making a round of RPCs to the associated datanodes, asking * the volume of each block replica. The returned array of * {@link BlockStorageLocation} expose this information as a * {@link VolumeId}. * * @param blockLocations * target blocks on which to query volume location information * @return volumeBlockLocations original block array augmented with additional * volume location information for each replica. */ public BlockStorageLocation[] getBlockStorageLocations( List<BlockLocation> blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { if (!getConf().getHdfsBlocksMetadataEnabled) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); } // Downcast blockLocations and fetch out required LocatedBlock(s) List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); for (BlockLocation loc : blockLocations) { if (!(loc instanceof HdfsBlockLocation)) { throw new ClassCastException("DFSClient#getVolumeBlockLocations " + "expected to be passed HdfsBlockLocations"); } HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; blocks.add(hdfsLoc.getLocatedBlock()); } // Re-group the LocatedBlocks to be grouped by datanodes, with the values // a list of the LocatedBlocks on the datanode. Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); for (LocatedBlock b : blocks) { for (DatanodeInfo info : b.getLocations()) { if (!datanodeBlocks.containsKey(info)) { datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); } List<LocatedBlock> l = datanodeBlocks.get(info); l.add(b); } } // Make RPCs to the datanodes to get volume locations for its replicas TraceScope scope = Trace.startSpan("getBlockStorageLocations", traceSampler); Map<DatanodeInfo, HdfsBlocksMetadata> metadatas; try { metadatas = BlockStorageLocationUtil. queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsTimeoutMs, getConf().connectToDnViaHostname); if (LOG.isTraceEnabled()) { LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); } } finally { scope.close(); } // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil .associateVolumeIdsWithBlocks(blocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil .convertToVolumeBlockLocations(blocks, blockVolumeIds); return volumeBlockLocations; }
private static void tryRead(final Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; IOException ioe = null; BlockReader blockReader = null; ExtendedBlock block = lblock.getBlock(); try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block). setBlockToken(lblock.getBlockToken()). setInetSocketAddress(targetAddr). setStartOffset(0). setLength(-1). setVerifyChecksum(true). setClientName("TestBlockTokenWithDFS"). setDatanodeInfo(nodes[0]). setCachingStrategy(CachingStrategy.newDefaultStrategy()). setClientCacheContext(ClientContext.getFromConf(conf)). setConfiguration(conf). setRemotePeerFactory(new RemotePeerFactory() { @Override public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); peer = TcpPeerServer.peerFromSocket(sock); } finally { if (peer == null) { IOUtils.closeSocket(sock); } } return peer; } }). build(); } catch (IOException ex) { ioe = ex; } finally { if (blockReader != null) { try { blockReader.close(); } catch (IOException e) { throw new RuntimeException(e); } } } if (shouldSucceed) { Assert.assertNotNull("OP_READ_BLOCK: access token is invalid, " + "when it is expected to be valid", blockReader); } else { Assert.assertNotNull("OP_READ_BLOCK: access token is valid, " + "when it is expected to be invalid", ioe); Assert.assertTrue( "OP_READ_BLOCK failed due to reasons other than access token: ", ioe instanceof InvalidBlockTokenException); } }
protected void tryRead(final Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; IOException ioe = null; BlockReader blockReader = null; ExtendedBlock block = lblock.getBlock(); try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); blockReader = new BlockReaderFactory(new DfsClientConf(conf)). setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block). setBlockToken(lblock.getBlockToken()). setInetSocketAddress(targetAddr). setStartOffset(0). setLength(-1). setVerifyChecksum(true). setClientName("TestBlockTokenWithDFS"). setDatanodeInfo(nodes[0]). setCachingStrategy(CachingStrategy.newDefaultStrategy()). setClientCacheContext(ClientContext.getFromConf(conf)). setConfiguration(conf). setTracer(FsTracer.get(conf)). setRemotePeerFactory(new RemotePeerFactory() { @Override public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); try { sock.connect(addr, HdfsConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); peer = DFSUtilClient.peerFromSocket(sock); } finally { if (peer == null) { IOUtils.closeSocket(sock); } } return peer; } }). build(); } catch (IOException ex) { ioe = ex; } finally { if (blockReader != null) { try { blockReader.close(); } catch (IOException e) { throw new RuntimeException(e); } } } if (shouldSucceed) { Assert.assertNotNull("OP_READ_BLOCK: access token is invalid, " + "when it is expected to be valid", blockReader); } else { Assert.assertNotNull("OP_READ_BLOCK: access token is valid, " + "when it is expected to be invalid", ioe); Assert.assertTrue( "OP_READ_BLOCK failed due to reasons other than access token: ", ioe instanceof InvalidBlockTokenException); } }
/** * Get block location information about a list of {@link HdfsBlockLocation}. * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to * get {@link BlockStorageLocation}s for blocks returned by * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} * . * * This is done by making a round of RPCs to the associated datanodes, asking * the volume of each block replica. The returned array of * {@link BlockStorageLocation} expose this information as a * {@link VolumeId}. * * @param blockLocations * target blocks on which to query volume location information * @return volumeBlockLocations original block array augmented with additional * volume location information for each replica. */ public BlockStorageLocation[] getBlockStorageLocations( List<BlockLocation> blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { if (!getConf().getHdfsBlocksMetadataEnabled) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); } // Downcast blockLocations and fetch out required LocatedBlock(s) List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); for (BlockLocation loc : blockLocations) { if (!(loc instanceof HdfsBlockLocation)) { throw new ClassCastException("DFSClient#getVolumeBlockLocations " + "expected to be passed HdfsBlockLocations"); } HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; blocks.add(hdfsLoc.getLocatedBlock()); } // Re-group the LocatedBlocks to be grouped by datanodes, with the values // a list of the LocatedBlocks on the datanode. Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); for (LocatedBlock b : blocks) { for (DatanodeInfo info : b.getLocations()) { if (!datanodeBlocks.containsKey(info)) { datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); } List<LocatedBlock> l = datanodeBlocks.get(info); l.add(b); } } // Make RPCs to the datanodes to get volume locations for its replicas Map<DatanodeInfo, HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsTimeoutMs, getConf().connectToDnViaHostname); if (LOG.isTraceEnabled()) { LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); } // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil .associateVolumeIdsWithBlocks(blocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil .convertToVolumeBlockLocations(blocks, blockVolumeIds); return volumeBlockLocations; }
/** * Get block location information about a list of {@link HdfsBlockLocation}. * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to * get {@link BlockStorageLocation}s for blocks returned by * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} * . * * This is done by making a round of RPCs to the associated datanodes, asking * the volume of each block replica. The returned array of * {@link BlockStorageLocation} expose this information as a * {@link VolumeId}. * * @param blockLocations * target blocks on which to query volume location information * @return volumeBlockLocations original block array augmented with additional * volume location information for each replica. */ public BlockStorageLocation[] getBlockStorageLocations( List<BlockLocation> blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { if (!getConf().getHdfsBlocksMetadataEnabled) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); } // Downcast blockLocations and fetch out required LocatedBlock(s) List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); for (BlockLocation loc : blockLocations) { if (!(loc instanceof HdfsBlockLocation)) { throw new ClassCastException("DFSClient#getVolumeBlockLocations " + "expected to be passed HdfsBlockLocations"); } HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; blocks.add(hdfsLoc.getLocatedBlock()); } // Re-group the LocatedBlocks to be grouped by datanodes, with the values // a list of the LocatedBlocks on the datanode. Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); for (LocatedBlock b : blocks) { for (DatanodeInfo info : b.getLocations()) { if (!datanodeBlocks.containsKey(info)) { datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); } List<LocatedBlock> l = datanodeBlocks.get(info); l.add(b); } } // Make RPCs to the datanodes to get volume locations for its replicas List<HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsTimeout, getConf().connectToDnViaHostname); // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil .associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil .convertToVolumeBlockLocations(blocks, blockVolumeIds); return volumeBlockLocations; }
private static void tryRead(Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; Socket s = null; BlockReader blockReader = null; ExtendedBlock block = lblock.getBlock(); try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); String file = BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); blockReader = BlockReaderFactory.newBlockReader( new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1, true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s), nodes[0], null, null, null, false); } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException) { assertFalse("OP_READ_BLOCK: access token is invalid, " + "when it is expected to be valid", shouldSucceed); return; } fail("OP_READ_BLOCK failed due to reasons other than access token: " + StringUtils.stringifyException(ex)); } finally { if (s != null) { try { s.close(); } catch (IOException iex) { } finally { s = null; } } } if (blockReader == null) { fail("OP_READ_BLOCK failed due to reasons other than access token"); } assertTrue("OP_READ_BLOCK: access token is valid, " + "when it is expected to be invalid", shouldSucceed); }