Java 类org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata 实例源码

项目:hadoop    文件:BlockStorageLocationUtil.java   
@Override
public HdfsBlocksMetadata call() throws Exception {
  HdfsBlocksMetadata metadata = null;
  // Create the RPC proxy and make the RPC
  ClientDatanodeProtocol cdp = null;
  TraceScope scope =
      Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
  try {
    cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
        timeout, connectToDnViaHostname);
    metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
  } catch (IOException e) {
    // Bubble this up to the caller, handle with the Future
    throw e;
  } finally {
    scope.close();
    if (cdp != null) {
      RPC.stopProxy(cdp);
    }
  }
  return metadata;
}
项目:hadoop    文件:DataNode.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(
    String bpId, long[] blockIds,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
    UnsupportedOperationException {
  if (!getHdfsBlockLocationsEnabled) {
    throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
        + " is not enabled in datanode config");
  }
  if (blockIds.length != tokens.size()) {
    throw new IOException("Differing number of blocks and tokens");
  }
  // Check access for each block
  for (int i = 0; i < blockIds.length; i++) {
    checkBlockToken(new ExtendedBlock(bpId, blockIds[i]),
        tokens.get(i), BlockTokenSecretManager.AccessMode.READ);
  }

  DataNodeFaultInjector.get().getHdfsBlocksMetadata();

  return data.getHdfsBlocksMetadata(bpId, blockIds);
}
项目:big-c    文件:BlockStorageLocationUtil.java   
@Override
public HdfsBlocksMetadata call() throws Exception {
  HdfsBlocksMetadata metadata = null;
  // Create the RPC proxy and make the RPC
  ClientDatanodeProtocol cdp = null;
  TraceScope scope =
      Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
  try {
    cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
        timeout, connectToDnViaHostname);
    metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
  } catch (IOException e) {
    // Bubble this up to the caller, handle with the Future
    throw e;
  } finally {
    scope.close();
    if (cdp != null) {
      RPC.stopProxy(cdp);
    }
  }
  return metadata;
}
项目:big-c    文件:DataNode.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(
    String bpId, long[] blockIds,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
    UnsupportedOperationException {
  if (!getHdfsBlockLocationsEnabled) {
    throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
        + " is not enabled in datanode config");
  }
  if (blockIds.length != tokens.size()) {
    throw new IOException("Differing number of blocks and tokens");
  }
  // Check access for each block
  for (int i = 0; i < blockIds.length; i++) {
    checkBlockToken(new ExtendedBlock(bpId, blockIds[i]),
        tokens.get(i), BlockTokenSecretManager.AccessMode.READ);
  }

  DataNodeFaultInjector.get().getHdfsBlocksMetadata();

  return data.getHdfsBlocksMetadata(bpId, blockIds);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockStorageLocationUtil.java   
@Override
public HdfsBlocksMetadata call() throws Exception {
  HdfsBlocksMetadata metadata = null;
  // Create the RPC proxy and make the RPC
  ClientDatanodeProtocol cdp = null;
  try {
    cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
        timeout, connectToDnViaHostname);
    metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
  } catch (IOException e) {
    // Bubble this up to the caller, handle with the Future
    throw e;
  } finally {
    if (cdp != null) {
      RPC.stopProxy(cdp);
    }
  }
  return metadata;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DataNode.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(
    String bpId, long[] blockIds,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
    UnsupportedOperationException {
  if (!getHdfsBlockLocationsEnabled) {
    throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
        + " is not enabled in datanode config");
  }
  if (blockIds.length != tokens.size()) {
    throw new IOException("Differing number of blocks and tokens");
  }
  // Check access for each block
  for (int i = 0; i < blockIds.length; i++) {
    checkBlockToken(new ExtendedBlock(bpId, blockIds[i]),
        tokens.get(i), BlockTokenSecretManager.AccessMode.READ);
  }

  DataNodeFaultInjector.get().getHdfsBlocksMetadata();

  return data.getHdfsBlocksMetadata(bpId, blockIds);
}
项目:hadoop-plus    文件:BlockStorageLocationUtil.java   
@Override
public HdfsBlocksMetadata call() throws Exception {
  HdfsBlocksMetadata metadata = null;
  // Create the RPC proxy and make the RPC
  ClientDatanodeProtocol cdp = null;
  try {
    cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
        timeout, connectToDnViaHostname);
    metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens);
  } catch (IOException e) {
    // Bubble this up to the caller, handle with the Future
    throw e;
  } finally {
    if (cdp != null) {
      RPC.stopProxy(cdp);
    }
  }
  return metadata;
}
项目:hadoop-plus    文件:DataNode.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
    UnsupportedOperationException {
  if (!getHdfsBlockLocationsEnabled) {
    throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
        + " is not enabled in datanode config");
  }
  if (blocks.size() != tokens.size()) {
    throw new IOException("Differing number of blocks and tokens");
  }
  // Check access for each block
  for (int i = 0; i < blocks.size(); i++) {
    checkBlockToken(blocks.get(i), tokens.get(i), 
        BlockTokenSecretManager.AccessMode.READ);
  }
  return data.getHdfsBlocksMetadata(blocks);
}
项目:PDHC    文件:CheckerNode.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
    UnsupportedOperationException {
  if (!getHdfsBlockLocationsEnabled) {
    throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
        + " is not enabled in datanode config");
  }
  if (blocks.size() != tokens.size()) {
    throw new IOException("Differing number of blocks and tokens");
  }
  // Check access for each block
  for (int i = 0; i < blocks.size(); i++) {
    checkBlockToken(blocks.get(i), tokens.get(i), 
        BlockTokenSecretManager.AccessMode.READ);
  }
  return data.getHdfsBlocksMetadata(blocks);
}
项目:FlexMap    文件:BlockStorageLocationUtil.java   
@Override
public HdfsBlocksMetadata call() throws Exception {
  HdfsBlocksMetadata metadata = null;
  // Create the RPC proxy and make the RPC
  ClientDatanodeProtocol cdp = null;
  try {
    cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
        timeout, connectToDnViaHostname);
    metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
  } catch (IOException e) {
    // Bubble this up to the caller, handle with the Future
    throw e;
  } finally {
    if (cdp != null) {
      RPC.stopProxy(cdp);
    }
  }
  return metadata;
}
项目:FlexMap    文件:DataNode.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(
    String bpId, long[] blockIds,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
    UnsupportedOperationException {
  if (!getHdfsBlockLocationsEnabled) {
    throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
        + " is not enabled in datanode config");
  }
  if (blockIds.length != tokens.size()) {
    throw new IOException("Differing number of blocks and tokens");
  }
  // Check access for each block
  for (int i = 0; i < blockIds.length; i++) {
    checkBlockToken(new ExtendedBlock(bpId, blockIds[i]),
        tokens.get(i), BlockTokenSecretManager.AccessMode.READ);
  }

  DataNodeFaultInjector.get().getHdfsBlocksMetadata();

  return data.getHdfsBlocksMetadata(bpId, blockIds);
}
项目:hops    文件:BlockStorageLocationUtil.java   
@Override
public HdfsBlocksMetadata call() throws Exception {
  HdfsBlocksMetadata metadata = null;
  // Create the RPC proxy and make the RPC
  ClientDatanodeProtocol cdp = null;
  try {
    cdp = DFSUtil
        .createClientDatanodeProtocolProxy(datanode, configuration, timeout,
            connectToDnViaHostname);
    metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens);
  } catch (IOException e) {
    // Bubble this up to the caller, handle with the Future
    throw e;
  } finally {
    if (cdp != null) {
      RPC.stopProxy(cdp);
    }
  }
  return metadata;
}
项目:hops    文件:DataNode.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
    List<Token<BlockTokenIdentifier>> tokens)
    throws IOException, UnsupportedOperationException {
  if (!getHdfsBlockLocationsEnabled) {
    throw new UnsupportedOperationException(
        "Datanode#getHdfsBlocksMetadata " +
            " is not enabled in datanode config");
  }
  if (blocks.size() != tokens.size()) {
    throw new IOException("Differing number of blocks and tokens");
  }
  // Check access for each block
  for (int i = 0; i < blocks.size(); i++) {
    checkBlockToken(blocks.get(i), tokens.get(i),
        BlockTokenSecretManager.AccessMode.READ);
  }
  return data.getHdfsBlocksMetadata(blocks);
}
项目:hadoop-TCP    文件:BlockStorageLocationUtil.java   
@Override
public HdfsBlocksMetadata call() throws Exception {
  HdfsBlocksMetadata metadata = null;
  // Create the RPC proxy and make the RPC
  ClientDatanodeProtocol cdp = null;
  try {
    cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
        timeout, connectToDnViaHostname);
    metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens);
  } catch (IOException e) {
    // Bubble this up to the caller, handle with the Future
    throw e;
  } finally {
    if (cdp != null) {
      RPC.stopProxy(cdp);
    }
  }
  return metadata;
}
项目:hardfs    文件:BlockStorageLocationUtil.java   
@Override
public HdfsBlocksMetadata call() throws Exception {
  HdfsBlocksMetadata metadata = null;
  // Create the RPC proxy and make the RPC
  ClientDatanodeProtocol cdp = null;
  try {
    cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
        timeout, connectToDnViaHostname);
    metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens);
  } catch (IOException e) {
    // Bubble this up to the caller, handle with the Future
    throw e;
  } finally {
    if (cdp != null) {
      RPC.stopProxy(cdp);
    }
  }
  return metadata;
}
项目:hadoop-on-lustre2    文件:BlockStorageLocationUtil.java   
@Override
public HdfsBlocksMetadata call() throws Exception {
  HdfsBlocksMetadata metadata = null;
  // Create the RPC proxy and make the RPC
  ClientDatanodeProtocol cdp = null;
  try {
    cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
        timeout, connectToDnViaHostname);
    metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
  } catch (IOException e) {
    // Bubble this up to the caller, handle with the Future
    throw e;
  } finally {
    if (cdp != null) {
      RPC.stopProxy(cdp);
    }
  }
  return metadata;
}
项目:hadoop    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
    long[] blockIds,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException {
  List<TokenProto> tokensProtos = 
      new ArrayList<TokenProto>(tokens.size());
  for (Token<BlockTokenIdentifier> t : tokens) {
    tokensProtos.add(PBHelper.convert(t));
  }
  // Build the request
  GetHdfsBlockLocationsRequestProto request = 
      GetHdfsBlockLocationsRequestProto.newBuilder()
      .setBlockPoolId(blockPoolId)
      .addAllBlockIds(Longs.asList(blockIds))
      .addAllTokens(tokensProtos)
      .build();
  // Send the RPC
  GetHdfsBlockLocationsResponseProto response;
  try {
    response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
  // List of volumes in the response
  List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
  List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
  for (ByteString bs : volumeIdsByteStrings) {
    volumeIds.add(bs.toByteArray());
  }
  // Array of indexes into the list of volumes, one per block
  List<Integer> volumeIndexes = response.getVolumeIndexesList();
  // Parsed HdfsVolumeId values, one per block
  return new HdfsBlocksMetadata(blockPoolId, blockIds,
      volumeIds, volumeIndexes);
}
项目:big-c    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
    long[] blockIds,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException {
  List<TokenProto> tokensProtos = 
      new ArrayList<TokenProto>(tokens.size());
  for (Token<BlockTokenIdentifier> t : tokens) {
    tokensProtos.add(PBHelper.convert(t));
  }
  // Build the request
  GetHdfsBlockLocationsRequestProto request = 
      GetHdfsBlockLocationsRequestProto.newBuilder()
      .setBlockPoolId(blockPoolId)
      .addAllBlockIds(Longs.asList(blockIds))
      .addAllTokens(tokensProtos)
      .build();
  // Send the RPC
  GetHdfsBlockLocationsResponseProto response;
  try {
    response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
  // List of volumes in the response
  List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
  List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
  for (ByteString bs : volumeIdsByteStrings) {
    volumeIds.add(bs.toByteArray());
  }
  // Array of indexes into the list of volumes, one per block
  List<Integer> volumeIndexes = response.getVolumeIndexesList();
  // Parsed HdfsVolumeId values, one per block
  return new HdfsBlocksMetadata(blockPoolId, blockIds,
      volumeIds, volumeIndexes);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
    long[] blockIds,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException {
  List<TokenProto> tokensProtos = 
      new ArrayList<TokenProto>(tokens.size());
  for (Token<BlockTokenIdentifier> t : tokens) {
    tokensProtos.add(PBHelper.convert(t));
  }
  // Build the request
  GetHdfsBlockLocationsRequestProto request = 
      GetHdfsBlockLocationsRequestProto.newBuilder()
      .setBlockPoolId(blockPoolId)
      .addAllBlockIds(Longs.asList(blockIds))
      .addAllTokens(tokensProtos)
      .build();
  // Send the RPC
  GetHdfsBlockLocationsResponseProto response;
  try {
    response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
  // List of volumes in the response
  List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
  List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
  for (ByteString bs : volumeIdsByteStrings) {
    volumeIds.add(bs.toByteArray());
  }
  // Array of indexes into the list of volumes, one per block
  List<Integer> volumeIndexes = response.getVolumeIndexesList();
  // Parsed HdfsVolumeId values, one per block
  return new HdfsBlocksMetadata(blockPoolId, blockIds,
      volumeIds, volumeIndexes);
}
项目:hadoop-plus    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
    throws IOException {
  // List of VolumeIds, one per volume on the datanode
  List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
  // List of indexes into the list of VolumeIds, pointing at the VolumeId of
  // the volume that the block is on
  List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blocks.size());
  // Initialize the list of VolumeIds simply by enumerating the volumes
  for (int i = 0; i < volumes.volumes.size(); i++) {
    blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
  }
  // Determine the index of the VolumeId of each block's volume, by comparing 
  // the block's volume against the enumerated volumes
  for (int i = 0; i < blocks.size(); i++) {
    ExtendedBlock block = blocks.get(i);
    FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume();
    boolean isValid = false;
    int volumeIndex = 0;
    for (FsVolumeImpl volume : volumes.volumes) {
      // This comparison of references should be safe
      if (blockVolume == volume) {
        isValid = true;
        break;
      }
      volumeIndex++;
    }
    // Indicates that the block is not present, or not found in a data dir
    if (!isValid) {
      volumeIndex = Integer.MAX_VALUE;
    }
    blocksVolumeIndexes.add(volumeIndex);
  }
  return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
      blocksVolumeIds, blocksVolumeIndexes);
}
项目:hadoop-plus    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException {
  // Convert to proto objects
  List<ExtendedBlockProto> blocksProtos = 
      new ArrayList<ExtendedBlockProto>(blocks.size());
  List<TokenProto> tokensProtos = 
      new ArrayList<TokenProto>(tokens.size());
  for (ExtendedBlock b : blocks) {
    blocksProtos.add(PBHelper.convert(b));
  }
  for (Token<BlockTokenIdentifier> t : tokens) {
    tokensProtos.add(PBHelper.convert(t));
  }
  // Build the request
  GetHdfsBlockLocationsRequestProto request = 
      GetHdfsBlockLocationsRequestProto.newBuilder()
      .addAllBlocks(blocksProtos)
      .addAllTokens(tokensProtos)
      .build();
  // Send the RPC
  GetHdfsBlockLocationsResponseProto response;
  try {
    response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
  // List of volumes in the response
  List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
  List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
  for (ByteString bs : volumeIdsByteStrings) {
    volumeIds.add(bs.toByteArray());
  }
  // Array of indexes into the list of volumes, one per block
  List<Integer> volumeIndexes = response.getVolumeIndexesList();
  // Parsed HdfsVolumeId values, one per block
  return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
      volumeIds, volumeIndexes);
}
项目:FlexMap    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
    long[] blockIds) throws IOException {
  // List of VolumeIds, one per volume on the datanode
  List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
  // List of indexes into the list of VolumeIds, pointing at the VolumeId of
  // the volume that the block is on
  List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
  // Initialize the list of VolumeIds simply by enumerating the volumes
  for (int i = 0; i < volumes.volumes.size(); i++) {
    blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
  }
  // Determine the index of the VolumeId of each block's volume, by comparing 
  // the block's volume against the enumerated volumes
  for (int i = 0; i < blockIds.length; i++) {
    long blockId = blockIds[i];
    boolean isValid = false;

    ReplicaInfo info = volumeMap.get(poolId, blockId);
    int volumeIndex = 0;
    if (info != null) {
      FsVolumeSpi blockVolume = info.getVolume();
      for (FsVolumeImpl volume : volumes.volumes) {
        // This comparison of references should be safe
        if (blockVolume == volume) {
          isValid = true;
          break;
        }
        volumeIndex++;
      }
    }
    // Indicates that the block is not present, or not found in a data dir
    if (!isValid) {
      volumeIndex = Integer.MAX_VALUE;
    }
    blocksVolumeIndexes.add(volumeIndex);
  }
  return new HdfsBlocksMetadata(poolId, blockIds,
      blocksVolumeIds, blocksVolumeIndexes);
}
项目:FlexMap    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
    long[] blockIds,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException {
  List<TokenProto> tokensProtos = 
      new ArrayList<TokenProto>(tokens.size());
  for (Token<BlockTokenIdentifier> t : tokens) {
    tokensProtos.add(PBHelper.convert(t));
  }
  // Build the request
  GetHdfsBlockLocationsRequestProto request = 
      GetHdfsBlockLocationsRequestProto.newBuilder()
      .setBlockPoolId(blockPoolId)
      .addAllBlockIds(Longs.asList(blockIds))
      .addAllTokens(tokensProtos)
      .build();
  // Send the RPC
  GetHdfsBlockLocationsResponseProto response;
  try {
    response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
  // List of volumes in the response
  List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
  List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
  for (ByteString bs : volumeIdsByteStrings) {
    volumeIds.add(bs.toByteArray());
  }
  // Array of indexes into the list of volumes, one per block
  List<Integer> volumeIndexes = response.getVolumeIndexesList();
  // Parsed HdfsVolumeId values, one per block
  return new HdfsBlocksMetadata(blockPoolId, blockIds,
      volumeIds, volumeIndexes);
}
项目:hops    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
    throws IOException {
  // List of VolumeIds, one per volume on the datanode
  List<byte[]> blocksVolumeIds =
      new ArrayList<>(volumes.volumes.size());
  // List of indexes into the list of VolumeIds, pointing at the VolumeId of
  // the volume that the block is on
  List<Integer> blocksVolumeIndexes = new ArrayList<>(blocks.size());
  // Initialize the list of VolumeIds simply by enumerating the volumes
  for (int i = 0; i < volumes.volumes.size(); i++) {
    blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
  }
  // Determine the index of the VolumeId of each block's volume, by comparing 
  // the block's volume against the enumerated volumes
  for (ExtendedBlock block : blocks) {
    FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume();
    boolean isValid = false;
    int volumeIndex = 0;
    for (FsVolumeImpl volume : volumes.volumes) {
      // This comparison of references should be safe
      if (blockVolume == volume) {
        isValid = true;
        break;
      }
      volumeIndex++;
    }
    // Indicates that the block is not present, or not found in a data dir
    if (!isValid) {
      volumeIndex = Integer.MAX_VALUE;
    }
    blocksVolumeIndexes.add(volumeIndex);
  }
  return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[]{}),
      blocksVolumeIds, blocksVolumeIndexes);
}
项目:hops    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException {
  // Convert to proto objects
  List<ExtendedBlockProto> blocksProtos =
      new ArrayList<>(blocks.size());
  List<TokenProto> tokensProtos = new ArrayList<>(tokens.size());
  for (ExtendedBlock b : blocks) {
    blocksProtos.add(PBHelper.convert(b));
  }
  for (Token<BlockTokenIdentifier> t : tokens) {
    tokensProtos.add(PBHelper.convert(t));
  }
  // Build the request
  GetHdfsBlockLocationsRequestProto request =
      GetHdfsBlockLocationsRequestProto.newBuilder()
          .addAllBlocks(blocksProtos).addAllTokens(tokensProtos).build();
  // Send the RPC
  GetHdfsBlockLocationsResponseProto response;
  try {
    response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
  // List of volumes in the response
  List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
  List<byte[]> volumeIds = new ArrayList<>(volumeIdsByteStrings.size());
  for (ByteString bs : volumeIdsByteStrings) {
    volumeIds.add(bs.toByteArray());
  }
  // Array of indexes into the list of volumes, one per block
  List<Integer> volumeIndexes = response.getVolumeIndexesList();
  // Parsed HdfsVolumeId values, one per block
  return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[]{}),
      volumeIds, volumeIndexes);
}
项目:hadoop-TCP    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
    throws IOException {
  // List of VolumeIds, one per volume on the datanode
  List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
  // List of indexes into the list of VolumeIds, pointing at the VolumeId of
  // the volume that the block is on
  List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blocks.size());
  // Initialize the list of VolumeIds simply by enumerating the volumes
  for (int i = 0; i < volumes.volumes.size(); i++) {
    blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
  }
  // Determine the index of the VolumeId of each block's volume, by comparing 
  // the block's volume against the enumerated volumes
  for (int i = 0; i < blocks.size(); i++) {
    ExtendedBlock block = blocks.get(i);
    FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume();
    boolean isValid = false;
    int volumeIndex = 0;
    for (FsVolumeImpl volume : volumes.volumes) {
      // This comparison of references should be safe
      if (blockVolume == volume) {
        isValid = true;
        break;
      }
      volumeIndex++;
    }
    // Indicates that the block is not present, or not found in a data dir
    if (!isValid) {
      volumeIndex = Integer.MAX_VALUE;
    }
    blocksVolumeIndexes.add(volumeIndex);
  }
  return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
      blocksVolumeIds, blocksVolumeIndexes);
}
项目:hadoop-TCP    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException {
  // Convert to proto objects
  List<ExtendedBlockProto> blocksProtos = 
      new ArrayList<ExtendedBlockProto>(blocks.size());
  List<TokenProto> tokensProtos = 
      new ArrayList<TokenProto>(tokens.size());
  for (ExtendedBlock b : blocks) {
    blocksProtos.add(PBHelper.convert(b));
  }
  for (Token<BlockTokenIdentifier> t : tokens) {
    tokensProtos.add(PBHelper.convert(t));
  }
  // Build the request
  GetHdfsBlockLocationsRequestProto request = 
      GetHdfsBlockLocationsRequestProto.newBuilder()
      .addAllBlocks(blocksProtos)
      .addAllTokens(tokensProtos)
      .build();
  // Send the RPC
  GetHdfsBlockLocationsResponseProto response;
  try {
    response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
  // List of volumes in the response
  List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
  List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
  for (ByteString bs : volumeIdsByteStrings) {
    volumeIds.add(bs.toByteArray());
  }
  // Array of indexes into the list of volumes, one per block
  List<Integer> volumeIndexes = response.getVolumeIndexesList();
  // Parsed HdfsVolumeId values, one per block
  return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
      volumeIds, volumeIndexes);
}
项目:hardfs    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
    throws IOException {
  // List of VolumeIds, one per volume on the datanode
  List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
  // List of indexes into the list of VolumeIds, pointing at the VolumeId of
  // the volume that the block is on
  List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blocks.size());
  // Initialize the list of VolumeIds simply by enumerating the volumes
  for (int i = 0; i < volumes.volumes.size(); i++) {
    blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
  }
  // Determine the index of the VolumeId of each block's volume, by comparing 
  // the block's volume against the enumerated volumes
  for (int i = 0; i < blocks.size(); i++) {
    ExtendedBlock block = blocks.get(i);
    FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume();
    boolean isValid = false;
    int volumeIndex = 0;
    for (FsVolumeImpl volume : volumes.volumes) {
      // This comparison of references should be safe
      if (blockVolume == volume) {
        isValid = true;
        break;
      }
      volumeIndex++;
    }
    // Indicates that the block is not present, or not found in a data dir
    if (!isValid) {
      volumeIndex = Integer.MAX_VALUE;
    }
    blocksVolumeIndexes.add(volumeIndex);
  }
  return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
      blocksVolumeIds, blocksVolumeIndexes);
}
项目:hardfs    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException {
  // Convert to proto objects
  List<ExtendedBlockProto> blocksProtos = 
      new ArrayList<ExtendedBlockProto>(blocks.size());
  List<TokenProto> tokensProtos = 
      new ArrayList<TokenProto>(tokens.size());
  for (ExtendedBlock b : blocks) {
    blocksProtos.add(PBHelper.convert(b));
  }
  for (Token<BlockTokenIdentifier> t : tokens) {
    tokensProtos.add(PBHelper.convert(t));
  }
  // Build the request
  GetHdfsBlockLocationsRequestProto request = 
      GetHdfsBlockLocationsRequestProto.newBuilder()
      .addAllBlocks(blocksProtos)
      .addAllTokens(tokensProtos)
      .build();
  // Send the RPC
  GetHdfsBlockLocationsResponseProto response;
  try {
    response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
  // List of volumes in the response
  List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
  List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
  for (ByteString bs : volumeIdsByteStrings) {
    volumeIds.add(bs.toByteArray());
  }
  // Array of indexes into the list of volumes, one per block
  List<Integer> volumeIndexes = response.getVolumeIndexesList();
  // Parsed HdfsVolumeId values, one per block
  return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
      volumeIds, volumeIndexes);
}
项目:hadoop-on-lustre2    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
    long[] blockIds,
    List<Token<BlockTokenIdentifier>> tokens) throws IOException {
  List<TokenProto> tokensProtos = 
      new ArrayList<TokenProto>(tokens.size());
  for (Token<BlockTokenIdentifier> t : tokens) {
    tokensProtos.add(PBHelper.convert(t));
  }
  // Build the request
  GetHdfsBlockLocationsRequestProto request = 
      GetHdfsBlockLocationsRequestProto.newBuilder()
      .setBlockPoolId(blockPoolId)
      .addAllBlockIds(Longs.asList(blockIds))
      .addAllTokens(tokensProtos)
      .build();
  // Send the RPC
  GetHdfsBlockLocationsResponseProto response;
  try {
    response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
  // List of volumes in the response
  List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
  List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
  for (ByteString bs : volumeIdsByteStrings) {
    volumeIds.add(bs.toByteArray());
  }
  // Array of indexes into the list of volumes, one per block
  List<Integer> volumeIndexes = response.getVolumeIndexesList();
  // Parsed HdfsVolumeId values, one per block
  return new HdfsBlocksMetadata(blockPoolId, blockIds,
      volumeIds, volumeIndexes);
}
项目:hadoop    文件:DFSClient.java   
/**
 * Get block location information about a list of {@link HdfsBlockLocation}.
 * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
 * get {@link BlockStorageLocation}s for blocks returned by
 * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
 * .
 * 
 * This is done by making a round of RPCs to the associated datanodes, asking
 * the volume of each block replica. The returned array of
 * {@link BlockStorageLocation} expose this information as a
 * {@link VolumeId}.
 * 
 * @param blockLocations
 *          target blocks on which to query volume location information
 * @return volumeBlockLocations original block array augmented with additional
 *         volume location information for each replica.
 */
public BlockStorageLocation[] getBlockStorageLocations(
    List<BlockLocation> blockLocations) throws IOException,
    UnsupportedOperationException, InvalidBlockTokenException {
  if (!getConf().getHdfsBlocksMetadataEnabled) {
    throw new UnsupportedOperationException("Datanode-side support for " +
        "getVolumeBlockLocations() must also be enabled in the client " +
        "configuration.");
  }
  // Downcast blockLocations and fetch out required LocatedBlock(s)
  List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
  for (BlockLocation loc : blockLocations) {
    if (!(loc instanceof HdfsBlockLocation)) {
      throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
          "expected to be passed HdfsBlockLocations");
    }
    HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
    blocks.add(hdfsLoc.getLocatedBlock());
  }

  // Re-group the LocatedBlocks to be grouped by datanodes, with the values
  // a list of the LocatedBlocks on the datanode.
  Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = 
      new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
  for (LocatedBlock b : blocks) {
    for (DatanodeInfo info : b.getLocations()) {
      if (!datanodeBlocks.containsKey(info)) {
        datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
      }
      List<LocatedBlock> l = datanodeBlocks.get(info);
      l.add(b);
    }
  }

  // Make RPCs to the datanodes to get volume locations for its replicas
  TraceScope scope =
    Trace.startSpan("getBlockStorageLocations", traceSampler);
  Map<DatanodeInfo, HdfsBlocksMetadata> metadatas;
  try {
    metadatas = BlockStorageLocationUtil.
        queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
            getConf().getFileBlockStorageLocationsNumThreads,
            getConf().getFileBlockStorageLocationsTimeoutMs,
            getConf().connectToDnViaHostname);
    if (LOG.isTraceEnabled()) {
      LOG.trace("metadata returned: "
          + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
    }
  } finally {
    scope.close();
  }

  // Regroup the returned VolumeId metadata to again be grouped by
  // LocatedBlock rather than by datanode
  Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
      .associateVolumeIdsWithBlocks(blocks, metadatas);

  // Combine original BlockLocations with new VolumeId information
  BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
      .convertToVolumeBlockLocations(blocks, blockVolumeIds);

  return volumeBlockLocations;
}
项目:hadoop    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
    long[] blockIds) throws IOException {
  List<FsVolumeImpl> curVolumes = getVolumes();
  // List of VolumeIds, one per volume on the datanode
  List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size());
  // List of indexes into the list of VolumeIds, pointing at the VolumeId of
  // the volume that the block is on
  List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
  // Initialize the list of VolumeIds simply by enumerating the volumes
  for (int i = 0; i < curVolumes.size(); i++) {
    blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
  }
  // Determine the index of the VolumeId of each block's volume, by comparing 
  // the block's volume against the enumerated volumes
  for (int i = 0; i < blockIds.length; i++) {
    long blockId = blockIds[i];
    boolean isValid = false;

    ReplicaInfo info = volumeMap.get(poolId, blockId);
    int volumeIndex = 0;
    if (info != null) {
      FsVolumeSpi blockVolume = info.getVolume();
      for (FsVolumeImpl volume : curVolumes) {
        // This comparison of references should be safe
        if (blockVolume == volume) {
          isValid = true;
          break;
        }
        volumeIndex++;
      }
    }
    // Indicates that the block is not present, or not found in a data dir
    if (!isValid) {
      volumeIndex = Integer.MAX_VALUE;
    }
    blocksVolumeIndexes.add(volumeIndex);
  }
  return new HdfsBlocksMetadata(poolId, blockIds,
      blocksVolumeIds, blocksVolumeIndexes);
}
项目:hadoop    文件:SimulatedFSDataset.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds)
    throws IOException {
  throw new UnsupportedOperationException();
}
项目:hadoop    文件:ExternalDatasetImpl.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) throws IOException {
  return new HdfsBlocksMetadata(null, null, null, null);
}
项目:big-c    文件:DFSClient.java   
/**
 * Get block location information about a list of {@link HdfsBlockLocation}.
 * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
 * get {@link BlockStorageLocation}s for blocks returned by
 * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
 * .
 * 
 * This is done by making a round of RPCs to the associated datanodes, asking
 * the volume of each block replica. The returned array of
 * {@link BlockStorageLocation} expose this information as a
 * {@link VolumeId}.
 * 
 * @param blockLocations
 *          target blocks on which to query volume location information
 * @return volumeBlockLocations original block array augmented with additional
 *         volume location information for each replica.
 */
public BlockStorageLocation[] getBlockStorageLocations(
    List<BlockLocation> blockLocations) throws IOException,
    UnsupportedOperationException, InvalidBlockTokenException {
  if (!getConf().getHdfsBlocksMetadataEnabled) {
    throw new UnsupportedOperationException("Datanode-side support for " +
        "getVolumeBlockLocations() must also be enabled in the client " +
        "configuration.");
  }
  // Downcast blockLocations and fetch out required LocatedBlock(s)
  List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
  for (BlockLocation loc : blockLocations) {
    if (!(loc instanceof HdfsBlockLocation)) {
      throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
          "expected to be passed HdfsBlockLocations");
    }
    HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
    blocks.add(hdfsLoc.getLocatedBlock());
  }

  // Re-group the LocatedBlocks to be grouped by datanodes, with the values
  // a list of the LocatedBlocks on the datanode.
  Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = 
      new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
  for (LocatedBlock b : blocks) {
    for (DatanodeInfo info : b.getLocations()) {
      if (!datanodeBlocks.containsKey(info)) {
        datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
      }
      List<LocatedBlock> l = datanodeBlocks.get(info);
      l.add(b);
    }
  }

  // Make RPCs to the datanodes to get volume locations for its replicas
  TraceScope scope =
    Trace.startSpan("getBlockStorageLocations", traceSampler);
  Map<DatanodeInfo, HdfsBlocksMetadata> metadatas;
  try {
    metadatas = BlockStorageLocationUtil.
        queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
            getConf().getFileBlockStorageLocationsNumThreads,
            getConf().getFileBlockStorageLocationsTimeoutMs,
            getConf().connectToDnViaHostname);
    if (LOG.isTraceEnabled()) {
      LOG.trace("metadata returned: "
          + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
    }
  } finally {
    scope.close();
  }

  // Regroup the returned VolumeId metadata to again be grouped by
  // LocatedBlock rather than by datanode
  Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
      .associateVolumeIdsWithBlocks(blocks, metadatas);

  // Combine original BlockLocations with new VolumeId information
  BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
      .convertToVolumeBlockLocations(blocks, blockVolumeIds);

  return volumeBlockLocations;
}
项目:big-c    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
    long[] blockIds) throws IOException {
  List<FsVolumeImpl> curVolumes = getVolumes();
  // List of VolumeIds, one per volume on the datanode
  List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size());
  // List of indexes into the list of VolumeIds, pointing at the VolumeId of
  // the volume that the block is on
  List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
  // Initialize the list of VolumeIds simply by enumerating the volumes
  for (int i = 0; i < curVolumes.size(); i++) {
    blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
  }
  // Determine the index of the VolumeId of each block's volume, by comparing 
  // the block's volume against the enumerated volumes
  for (int i = 0; i < blockIds.length; i++) {
    long blockId = blockIds[i];
    boolean isValid = false;

    ReplicaInfo info = volumeMap.get(poolId, blockId);
    int volumeIndex = 0;
    if (info != null) {
      FsVolumeSpi blockVolume = info.getVolume();
      for (FsVolumeImpl volume : curVolumes) {
        // This comparison of references should be safe
        if (blockVolume == volume) {
          isValid = true;
          break;
        }
        volumeIndex++;
      }
    }
    // Indicates that the block is not present, or not found in a data dir
    if (!isValid) {
      volumeIndex = Integer.MAX_VALUE;
    }
    blocksVolumeIndexes.add(volumeIndex);
  }
  return new HdfsBlocksMetadata(poolId, blockIds,
      blocksVolumeIds, blocksVolumeIndexes);
}
项目:big-c    文件:SimulatedFSDataset.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds)
    throws IOException {
  throw new UnsupportedOperationException();
}
项目:big-c    文件:ExternalDatasetImpl.java   
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) throws IOException {
  return new HdfsBlocksMetadata(null, null, null, null);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DFSClient.java   
/**
 * Get block location information about a list of {@link HdfsBlockLocation}.
 * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
 * get {@link BlockStorageLocation}s for blocks returned by
 * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
 * .
 * 
 * This is done by making a round of RPCs to the associated datanodes, asking
 * the volume of each block replica. The returned array of
 * {@link BlockStorageLocation} expose this information as a
 * {@link VolumeId}.
 * 
 * @param blockLocations
 *          target blocks on which to query volume location information
 * @return volumeBlockLocations original block array augmented with additional
 *         volume location information for each replica.
 */
public BlockStorageLocation[] getBlockStorageLocations(
    List<BlockLocation> blockLocations) throws IOException,
    UnsupportedOperationException, InvalidBlockTokenException {
  if (!getConf().getHdfsBlocksMetadataEnabled) {
    throw new UnsupportedOperationException("Datanode-side support for " +
        "getVolumeBlockLocations() must also be enabled in the client " +
        "configuration.");
  }
  // Downcast blockLocations and fetch out required LocatedBlock(s)
  List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
  for (BlockLocation loc : blockLocations) {
    if (!(loc instanceof HdfsBlockLocation)) {
      throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
          "expected to be passed HdfsBlockLocations");
    }
    HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
    blocks.add(hdfsLoc.getLocatedBlock());
  }

  // Re-group the LocatedBlocks to be grouped by datanodes, with the values
  // a list of the LocatedBlocks on the datanode.
  Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = 
      new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
  for (LocatedBlock b : blocks) {
    for (DatanodeInfo info : b.getLocations()) {
      if (!datanodeBlocks.containsKey(info)) {
        datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
      }
      List<LocatedBlock> l = datanodeBlocks.get(info);
      l.add(b);
    }
  }

  // Make RPCs to the datanodes to get volume locations for its replicas
  Map<DatanodeInfo, HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil
      .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
          getConf().getFileBlockStorageLocationsNumThreads,
          getConf().getFileBlockStorageLocationsTimeoutMs,
          getConf().connectToDnViaHostname);

  if (LOG.isTraceEnabled()) {
    LOG.trace("metadata returned: "
        + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
  }

  // Regroup the returned VolumeId metadata to again be grouped by
  // LocatedBlock rather than by datanode
  Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
      .associateVolumeIdsWithBlocks(blocks, metadatas);

  // Combine original BlockLocations with new VolumeId information
  BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
      .convertToVolumeBlockLocations(blocks, blockVolumeIds);

  return volumeBlockLocations;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
    long[] blockIds) throws IOException {
  List<FsVolumeImpl> curVolumes = getVolumes();
  // List of VolumeIds, one per volume on the datanode
  List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size());
  // List of indexes into the list of VolumeIds, pointing at the VolumeId of
  // the volume that the block is on
  List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
  // Initialize the list of VolumeIds simply by enumerating the volumes
  for (int i = 0; i < curVolumes.size(); i++) {
    blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
  }
  // Determine the index of the VolumeId of each block's volume, by comparing 
  // the block's volume against the enumerated volumes
  for (int i = 0; i < blockIds.length; i++) {
    long blockId = blockIds[i];
    boolean isValid = false;

    ReplicaInfo info = volumeMap.get(poolId, blockId);
    int volumeIndex = 0;
    if (info != null) {
      FsVolumeSpi blockVolume = info.getVolume();
      for (FsVolumeImpl volume : curVolumes) {
        // This comparison of references should be safe
        if (blockVolume == volume) {
          isValid = true;
          break;
        }
        volumeIndex++;
      }
    }
    // Indicates that the block is not present, or not found in a data dir
    if (!isValid) {
      volumeIndex = Integer.MAX_VALUE;
    }
    blocksVolumeIndexes.add(volumeIndex);
  }
  return new HdfsBlocksMetadata(poolId, blockIds,
      blocksVolumeIds, blocksVolumeIndexes);
}