@Override public HdfsBlocksMetadata call() throws Exception { HdfsBlocksMetadata metadata = null; // Create the RPC proxy and make the RPC ClientDatanodeProtocol cdp = null; TraceScope scope = Trace.startSpan("getHdfsBlocksMetadata", parentSpan); try { cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, timeout, connectToDnViaHostname); metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens); } catch (IOException e) { // Bubble this up to the caller, handle with the Future throw e; } finally { scope.close(); if (cdp != null) { RPC.stopProxy(cdp); } } return metadata; }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata( String bpId, long[] blockIds, List<Token<BlockTokenIdentifier>> tokens) throws IOException, UnsupportedOperationException { if (!getHdfsBlockLocationsEnabled) { throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata " + " is not enabled in datanode config"); } if (blockIds.length != tokens.size()) { throw new IOException("Differing number of blocks and tokens"); } // Check access for each block for (int i = 0; i < blockIds.length; i++) { checkBlockToken(new ExtendedBlock(bpId, blockIds[i]), tokens.get(i), BlockTokenSecretManager.AccessMode.READ); } DataNodeFaultInjector.get().getHdfsBlocksMetadata(); return data.getHdfsBlocksMetadata(bpId, blockIds); }
@Override public HdfsBlocksMetadata call() throws Exception { HdfsBlocksMetadata metadata = null; // Create the RPC proxy and make the RPC ClientDatanodeProtocol cdp = null; try { cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, timeout, connectToDnViaHostname); metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens); } catch (IOException e) { // Bubble this up to the caller, handle with the Future throw e; } finally { if (cdp != null) { RPC.stopProxy(cdp); } } return metadata; }
@Override public HdfsBlocksMetadata call() throws Exception { HdfsBlocksMetadata metadata = null; // Create the RPC proxy and make the RPC ClientDatanodeProtocol cdp = null; try { cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, timeout, connectToDnViaHostname); metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens); } catch (IOException e) { // Bubble this up to the caller, handle with the Future throw e; } finally { if (cdp != null) { RPC.stopProxy(cdp); } } return metadata; }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, List<Token<BlockTokenIdentifier>> tokens) throws IOException, UnsupportedOperationException { if (!getHdfsBlockLocationsEnabled) { throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata " + " is not enabled in datanode config"); } if (blocks.size() != tokens.size()) { throw new IOException("Differing number of blocks and tokens"); } // Check access for each block for (int i = 0; i < blocks.size(); i++) { checkBlockToken(blocks.get(i), tokens.get(i), BlockTokenSecretManager.AccessMode.READ); } return data.getHdfsBlocksMetadata(blocks); }
@Override public HdfsBlocksMetadata call() throws Exception { HdfsBlocksMetadata metadata = null; // Create the RPC proxy and make the RPC ClientDatanodeProtocol cdp = null; try { cdp = DFSUtil .createClientDatanodeProtocolProxy(datanode, configuration, timeout, connectToDnViaHostname); metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens); } catch (IOException e) { // Bubble this up to the caller, handle with the Future throw e; } finally { if (cdp != null) { RPC.stopProxy(cdp); } } return metadata; }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, List<Token<BlockTokenIdentifier>> tokens) throws IOException, UnsupportedOperationException { if (!getHdfsBlockLocationsEnabled) { throw new UnsupportedOperationException( "Datanode#getHdfsBlocksMetadata " + " is not enabled in datanode config"); } if (blocks.size() != tokens.size()) { throw new IOException("Differing number of blocks and tokens"); } // Check access for each block for (int i = 0; i < blocks.size(); i++) { checkBlockToken(blocks.get(i), tokens.get(i), BlockTokenSecretManager.AccessMode.READ); } return data.getHdfsBlocksMetadata(blocks); }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId, long[] blockIds, List<Token<BlockTokenIdentifier>> tokens) throws IOException { List<TokenProto> tokensProtos = new ArrayList<TokenProto>(tokens.size()); for (Token<BlockTokenIdentifier> t : tokens) { tokensProtos.add(PBHelper.convert(t)); } // Build the request GetHdfsBlockLocationsRequestProto request = GetHdfsBlockLocationsRequestProto.newBuilder() .setBlockPoolId(blockPoolId) .addAllBlockIds(Longs.asList(blockIds)) .addAllTokens(tokensProtos) .build(); // Send the RPC GetHdfsBlockLocationsResponseProto response; try { response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } // List of volumes in the response List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList(); List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size()); for (ByteString bs : volumeIdsByteStrings) { volumeIds.add(bs.toByteArray()); } // Array of indexes into the list of volumes, one per block List<Integer> volumeIndexes = response.getVolumeIndexesList(); // Parsed HdfsVolumeId values, one per block return new HdfsBlocksMetadata(blockPoolId, blockIds, volumeIds, volumeIndexes); }
@Override // FsDatasetSpi public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) throws IOException { // List of VolumeIds, one per volume on the datanode List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size()); // List of indexes into the list of VolumeIds, pointing at the VolumeId of // the volume that the block is on List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blocks.size()); // Initialize the list of VolumeIds simply by enumerating the volumes for (int i = 0; i < volumes.volumes.size(); i++) { blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array()); } // Determine the index of the VolumeId of each block's volume, by comparing // the block's volume against the enumerated volumes for (int i = 0; i < blocks.size(); i++) { ExtendedBlock block = blocks.get(i); FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume(); boolean isValid = false; int volumeIndex = 0; for (FsVolumeImpl volume : volumes.volumes) { // This comparison of references should be safe if (blockVolume == volume) { isValid = true; break; } volumeIndex++; } // Indicates that the block is not present, or not found in a data dir if (!isValid) { volumeIndex = Integer.MAX_VALUE; } blocksVolumeIndexes.add(volumeIndex); } return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), blocksVolumeIds, blocksVolumeIndexes); }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, List<Token<BlockTokenIdentifier>> tokens) throws IOException { // Convert to proto objects List<ExtendedBlockProto> blocksProtos = new ArrayList<ExtendedBlockProto>(blocks.size()); List<TokenProto> tokensProtos = new ArrayList<TokenProto>(tokens.size()); for (ExtendedBlock b : blocks) { blocksProtos.add(PBHelper.convert(b)); } for (Token<BlockTokenIdentifier> t : tokens) { tokensProtos.add(PBHelper.convert(t)); } // Build the request GetHdfsBlockLocationsRequestProto request = GetHdfsBlockLocationsRequestProto.newBuilder() .addAllBlocks(blocksProtos) .addAllTokens(tokensProtos) .build(); // Send the RPC GetHdfsBlockLocationsResponseProto response; try { response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } // List of volumes in the response List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList(); List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size()); for (ByteString bs : volumeIdsByteStrings) { volumeIds.add(bs.toByteArray()); } // Array of indexes into the list of volumes, one per block List<Integer> volumeIndexes = response.getVolumeIndexesList(); // Parsed HdfsVolumeId values, one per block return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), volumeIds, volumeIndexes); }
@Override // FsDatasetSpi public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId, long[] blockIds) throws IOException { // List of VolumeIds, one per volume on the datanode List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size()); // List of indexes into the list of VolumeIds, pointing at the VolumeId of // the volume that the block is on List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length); // Initialize the list of VolumeIds simply by enumerating the volumes for (int i = 0; i < volumes.volumes.size(); i++) { blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array()); } // Determine the index of the VolumeId of each block's volume, by comparing // the block's volume against the enumerated volumes for (int i = 0; i < blockIds.length; i++) { long blockId = blockIds[i]; boolean isValid = false; ReplicaInfo info = volumeMap.get(poolId, blockId); int volumeIndex = 0; if (info != null) { FsVolumeSpi blockVolume = info.getVolume(); for (FsVolumeImpl volume : volumes.volumes) { // This comparison of references should be safe if (blockVolume == volume) { isValid = true; break; } volumeIndex++; } } // Indicates that the block is not present, or not found in a data dir if (!isValid) { volumeIndex = Integer.MAX_VALUE; } blocksVolumeIndexes.add(volumeIndex); } return new HdfsBlocksMetadata(poolId, blockIds, blocksVolumeIds, blocksVolumeIndexes); }
@Override // FsDatasetSpi public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) throws IOException { // List of VolumeIds, one per volume on the datanode List<byte[]> blocksVolumeIds = new ArrayList<>(volumes.volumes.size()); // List of indexes into the list of VolumeIds, pointing at the VolumeId of // the volume that the block is on List<Integer> blocksVolumeIndexes = new ArrayList<>(blocks.size()); // Initialize the list of VolumeIds simply by enumerating the volumes for (int i = 0; i < volumes.volumes.size(); i++) { blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array()); } // Determine the index of the VolumeId of each block's volume, by comparing // the block's volume against the enumerated volumes for (ExtendedBlock block : blocks) { FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume(); boolean isValid = false; int volumeIndex = 0; for (FsVolumeImpl volume : volumes.volumes) { // This comparison of references should be safe if (blockVolume == volume) { isValid = true; break; } volumeIndex++; } // Indicates that the block is not present, or not found in a data dir if (!isValid) { volumeIndex = Integer.MAX_VALUE; } blocksVolumeIndexes.add(volumeIndex); } return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[]{}), blocksVolumeIds, blocksVolumeIndexes); }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, List<Token<BlockTokenIdentifier>> tokens) throws IOException { // Convert to proto objects List<ExtendedBlockProto> blocksProtos = new ArrayList<>(blocks.size()); List<TokenProto> tokensProtos = new ArrayList<>(tokens.size()); for (ExtendedBlock b : blocks) { blocksProtos.add(PBHelper.convert(b)); } for (Token<BlockTokenIdentifier> t : tokens) { tokensProtos.add(PBHelper.convert(t)); } // Build the request GetHdfsBlockLocationsRequestProto request = GetHdfsBlockLocationsRequestProto.newBuilder() .addAllBlocks(blocksProtos).addAllTokens(tokensProtos).build(); // Send the RPC GetHdfsBlockLocationsResponseProto response; try { response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } // List of volumes in the response List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList(); List<byte[]> volumeIds = new ArrayList<>(volumeIdsByteStrings.size()); for (ByteString bs : volumeIdsByteStrings) { volumeIds.add(bs.toByteArray()); } // Array of indexes into the list of volumes, one per block List<Integer> volumeIndexes = response.getVolumeIndexesList(); // Parsed HdfsVolumeId values, one per block return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[]{}), volumeIds, volumeIndexes); }
/** * Get block location information about a list of {@link HdfsBlockLocation}. * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to * get {@link BlockStorageLocation}s for blocks returned by * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} * . * * This is done by making a round of RPCs to the associated datanodes, asking * the volume of each block replica. The returned array of * {@link BlockStorageLocation} expose this information as a * {@link VolumeId}. * * @param blockLocations * target blocks on which to query volume location information * @return volumeBlockLocations original block array augmented with additional * volume location information for each replica. */ public BlockStorageLocation[] getBlockStorageLocations( List<BlockLocation> blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { if (!getConf().getHdfsBlocksMetadataEnabled) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); } // Downcast blockLocations and fetch out required LocatedBlock(s) List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); for (BlockLocation loc : blockLocations) { if (!(loc instanceof HdfsBlockLocation)) { throw new ClassCastException("DFSClient#getVolumeBlockLocations " + "expected to be passed HdfsBlockLocations"); } HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; blocks.add(hdfsLoc.getLocatedBlock()); } // Re-group the LocatedBlocks to be grouped by datanodes, with the values // a list of the LocatedBlocks on the datanode. Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); for (LocatedBlock b : blocks) { for (DatanodeInfo info : b.getLocations()) { if (!datanodeBlocks.containsKey(info)) { datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); } List<LocatedBlock> l = datanodeBlocks.get(info); l.add(b); } } // Make RPCs to the datanodes to get volume locations for its replicas TraceScope scope = Trace.startSpan("getBlockStorageLocations", traceSampler); Map<DatanodeInfo, HdfsBlocksMetadata> metadatas; try { metadatas = BlockStorageLocationUtil. queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsTimeoutMs, getConf().connectToDnViaHostname); if (LOG.isTraceEnabled()) { LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); } } finally { scope.close(); } // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil .associateVolumeIdsWithBlocks(blocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil .convertToVolumeBlockLocations(blocks, blockVolumeIds); return volumeBlockLocations; }
@Override // FsDatasetSpi public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId, long[] blockIds) throws IOException { List<FsVolumeImpl> curVolumes = getVolumes(); // List of VolumeIds, one per volume on the datanode List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size()); // List of indexes into the list of VolumeIds, pointing at the VolumeId of // the volume that the block is on List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length); // Initialize the list of VolumeIds simply by enumerating the volumes for (int i = 0; i < curVolumes.size(); i++) { blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array()); } // Determine the index of the VolumeId of each block's volume, by comparing // the block's volume against the enumerated volumes for (int i = 0; i < blockIds.length; i++) { long blockId = blockIds[i]; boolean isValid = false; ReplicaInfo info = volumeMap.get(poolId, blockId); int volumeIndex = 0; if (info != null) { FsVolumeSpi blockVolume = info.getVolume(); for (FsVolumeImpl volume : curVolumes) { // This comparison of references should be safe if (blockVolume == volume) { isValid = true; break; } volumeIndex++; } } // Indicates that the block is not present, or not found in a data dir if (!isValid) { volumeIndex = Integer.MAX_VALUE; } blocksVolumeIndexes.add(volumeIndex); } return new HdfsBlocksMetadata(poolId, blockIds, blocksVolumeIds, blocksVolumeIndexes); }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) throws IOException { throw new UnsupportedOperationException(); }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) throws IOException { return new HdfsBlocksMetadata(null, null, null, null); }
/** * Get block location information about a list of {@link HdfsBlockLocation}. * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to * get {@link BlockStorageLocation}s for blocks returned by * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} * . * * This is done by making a round of RPCs to the associated datanodes, asking * the volume of each block replica. The returned array of * {@link BlockStorageLocation} expose this information as a * {@link VolumeId}. * * @param blockLocations * target blocks on which to query volume location information * @return volumeBlockLocations original block array augmented with additional * volume location information for each replica. */ public BlockStorageLocation[] getBlockStorageLocations( List<BlockLocation> blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { if (!getConf().getHdfsBlocksMetadataEnabled) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); } // Downcast blockLocations and fetch out required LocatedBlock(s) List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); for (BlockLocation loc : blockLocations) { if (!(loc instanceof HdfsBlockLocation)) { throw new ClassCastException("DFSClient#getVolumeBlockLocations " + "expected to be passed HdfsBlockLocations"); } HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; blocks.add(hdfsLoc.getLocatedBlock()); } // Re-group the LocatedBlocks to be grouped by datanodes, with the values // a list of the LocatedBlocks on the datanode. Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); for (LocatedBlock b : blocks) { for (DatanodeInfo info : b.getLocations()) { if (!datanodeBlocks.containsKey(info)) { datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); } List<LocatedBlock> l = datanodeBlocks.get(info); l.add(b); } } // Make RPCs to the datanodes to get volume locations for its replicas Map<DatanodeInfo, HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsTimeoutMs, getConf().connectToDnViaHostname); if (LOG.isTraceEnabled()) { LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); } // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil .associateVolumeIdsWithBlocks(blocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil .convertToVolumeBlockLocations(blocks, blockVolumeIds); return volumeBlockLocations; }