@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId, long[] blockIds, List<Token<BlockTokenIdentifier>> tokens) throws IOException { List<TokenProto> tokensProtos = new ArrayList<TokenProto>(tokens.size()); for (Token<BlockTokenIdentifier> t : tokens) { tokensProtos.add(PBHelper.convert(t)); } // Build the request GetHdfsBlockLocationsRequestProto request = GetHdfsBlockLocationsRequestProto.newBuilder() .setBlockPoolId(blockPoolId) .addAllBlockIds(Longs.asList(blockIds)) .addAllTokens(tokensProtos) .build(); // Send the RPC GetHdfsBlockLocationsResponseProto response; try { response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } // List of volumes in the response List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList(); List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size()); for (ByteString bs : volumeIdsByteStrings) { volumeIds.add(bs.toByteArray()); } // Array of indexes into the list of volumes, one per block List<Integer> volumeIndexes = response.getVolumeIndexesList(); // Parsed HdfsVolumeId values, one per block return new HdfsBlocksMetadata(blockPoolId, blockIds, volumeIds, volumeIndexes); }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, List<Token<BlockTokenIdentifier>> tokens) throws IOException { // Convert to proto objects List<ExtendedBlockProto> blocksProtos = new ArrayList<ExtendedBlockProto>(blocks.size()); List<TokenProto> tokensProtos = new ArrayList<TokenProto>(tokens.size()); for (ExtendedBlock b : blocks) { blocksProtos.add(PBHelper.convert(b)); } for (Token<BlockTokenIdentifier> t : tokens) { tokensProtos.add(PBHelper.convert(t)); } // Build the request GetHdfsBlockLocationsRequestProto request = GetHdfsBlockLocationsRequestProto.newBuilder() .addAllBlocks(blocksProtos) .addAllTokens(tokensProtos) .build(); // Send the RPC GetHdfsBlockLocationsResponseProto response; try { response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } // List of volumes in the response List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList(); List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size()); for (ByteString bs : volumeIdsByteStrings) { volumeIds.add(bs.toByteArray()); } // Array of indexes into the list of volumes, one per block List<Integer> volumeIndexes = response.getVolumeIndexesList(); // Parsed HdfsVolumeId values, one per block return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), volumeIds, volumeIndexes); }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, List<Token<BlockTokenIdentifier>> tokens) throws IOException { // Convert to proto objects List<ExtendedBlockProto> blocksProtos = new ArrayList<>(blocks.size()); List<TokenProto> tokensProtos = new ArrayList<>(tokens.size()); for (ExtendedBlock b : blocks) { blocksProtos.add(PBHelper.convert(b)); } for (Token<BlockTokenIdentifier> t : tokens) { tokensProtos.add(PBHelper.convert(t)); } // Build the request GetHdfsBlockLocationsRequestProto request = GetHdfsBlockLocationsRequestProto.newBuilder() .addAllBlocks(blocksProtos).addAllTokens(tokensProtos).build(); // Send the RPC GetHdfsBlockLocationsResponseProto response; try { response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } // List of volumes in the response List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList(); List<byte[]> volumeIds = new ArrayList<>(volumeIdsByteStrings.size()); for (ByteString bs : volumeIdsByteStrings) { volumeIds.add(bs.toByteArray()); } // Array of indexes into the list of volumes, one per block List<Integer> volumeIndexes = response.getVolumeIndexesList(); // Parsed HdfsVolumeId values, one per block return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[]{}), volumeIds, volumeIndexes); }