Java 类org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi 实例源码

项目:hadoop    文件:DirectoryScanner.java   
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
  this.blockId = blockId;
  String condensedVolPath = vol == null ? null :
    getCondensedPath(vol.getBasePath());
  this.blockSuffix = blockFile == null ? null :
    getSuffix(blockFile, condensedVolPath);
  this.blockFileLength = (blockFile != null) ? blockFile.length() : 0; 
  if (metaFile == null) {
    this.metaSuffix = null;
  } else if (blockFile == null) {
    this.metaSuffix = getSuffix(metaFile, condensedVolPath);
  } else {
    this.metaSuffix = getSuffix(metaFile,
        condensedVolPath + blockSuffix);
  }
  this.volume = vol;
}
项目:hadoop    文件:BlockScanner.java   
/**
 * Stops and removes a volume scanner.<p/>
 *
 * This function will block until the volume scanner has stopped.
 *
 * @param volume           The volume to remove.
 */
public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
  if (!isEnabled()) {
    LOG.debug("Not removing volume scanner for {}, because the block " +
        "scanner is disabled.", volume.getStorageID());
    return;
  }
  VolumeScanner scanner = scanners.get(volume.getStorageID());
  if (scanner == null) {
    LOG.warn("No scanner found to remove for volumeId {}",
        volume.getStorageID());
    return;
  }
  LOG.info("Removing scanner for volume {} (StorageID {})",
      volume.getBasePath(), volume.getStorageID());
  scanner.shutdown();
  scanners.remove(volume.getStorageID());
  Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
}
项目:hadoop    文件:TestDiskError.java   
/**
 * Check that the permissions of the local DN directories are as expected.
 */
@Test
public void testLocalDirs() throws Exception {
  Configuration conf = new Configuration();
  final String permStr = conf.get(
    DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY);
  FsPermission expected = new FsPermission(permStr);

  // Check permissions on directories in 'dfs.datanode.data.dir'
  FileSystem localFS = FileSystem.getLocal(conf);
  for (DataNode dn : cluster.getDataNodes()) {
    for (FsVolumeSpi v : dn.getFSDataset().getVolumes()) {
      String dir = v.getBasePath();
      Path dataDir = new Path(dir);
      FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
        assertEquals("Permission for dir: " + dataDir + ", is " + actual +
            ", while expected is " + expected, expected, actual);
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:DirectoryScanner.java   
/**
 * Create a ScanInfo object for a block. This constructor will examine
 * the block data and meta-data files.
 *
 * @param blockId the block ID
 * @param blockFile the path to the block data file
 * @param metaFile the path to the block meta-data file
 * @param vol the volume that contains the block
 */
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
  this.blockId = blockId;
  String condensedVolPath = vol == null ? null :
    getCondensedPath(vol.getBasePath());
  this.blockSuffix = blockFile == null ? null :
    getSuffix(blockFile, condensedVolPath);
  this.blockFileLength = (blockFile != null) ? blockFile.length() : 0; 
  if (metaFile == null) {
    this.metaSuffix = null;
  } else if (blockFile == null) {
    this.metaSuffix = getSuffix(metaFile, condensedVolPath);
  } else {
    this.metaSuffix = getSuffix(metaFile,
        condensedVolPath + blockSuffix);
  }
  this.volume = vol;
}
项目:aliyun-oss-hadoop-fs    文件:BlockScanner.java   
/**
 * Stops and removes a volume scanner.<p/>
 *
 * This function will block until the volume scanner has stopped.
 *
 * @param volume           The volume to remove.
 */
public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
  if (!isEnabled()) {
    LOG.debug("Not removing volume scanner for {}, because the block " +
        "scanner is disabled.", volume.getStorageID());
    return;
  }
  VolumeScanner scanner = scanners.get(volume.getStorageID());
  if (scanner == null) {
    LOG.warn("No scanner found to remove for volumeId {}",
        volume.getStorageID());
    return;
  }
  LOG.info("Removing scanner for volume {} (StorageID {})",
      volume.getBasePath(), volume.getStorageID());
  scanner.shutdown();
  scanners.remove(volume.getStorageID());
  Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImplTestUtils.java   
@Override
public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
  // Reload replicas from the disk.
  ReplicaMap replicaMap = new ReplicaMap(dataset);
  try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
    for (FsVolumeSpi vol : refs) {
      FsVolumeImpl volume = (FsVolumeImpl) vol;
      volume.getVolumeMap(bpid, replicaMap, dataset.ramDiskReplicaTracker);
    }
  }

  // Cast ReplicaInfo to Replica, because ReplicaInfo assumes a file-based
  // FsVolumeSpi implementation.
  List<Replica> ret = new ArrayList<>();
  if (replicaMap.replicas(bpid) != null) {
    ret.addAll(replicaMap.replicas(bpid));
  }
  return ret.iterator();
}
项目:aliyun-oss-hadoop-fs    文件:TestWriteToReplica.java   
private void createReplicas(List<String> bpList, List<FsVolumeSpi> volumes,
                            FsDatasetTestUtils testUtils) throws IOException {
  // Here we create all different type of replicas and add it
  // to volume map. 
  // Created all type of ReplicaInfo, each under Blkpool corresponding volume
  long id = 1; // This variable is used as both blockId and genStamp
  for (String bpId: bpList) {
    for (FsVolumeSpi volume: volumes) {
      ExtendedBlock eb = new ExtendedBlock(bpId, id, 1, id);
      testUtils.createFinalizedReplica(volume, eb);
      id++;

      eb = new ExtendedBlock(bpId, id, 1, id);
      testUtils.createRBW(volume, eb);
      id++;

      eb = new ExtendedBlock(bpId, id, 1, id);
      testUtils.createReplicaWaitingToBeRecovered(volume, eb);
      id++;

      eb = new ExtendedBlock(bpId, id, 1, id);
      testUtils.createReplicaInPipeline(volume, eb);
      id++;
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestDiskError.java   
/**
 * Check that the permissions of the local DN directories are as expected.
 */
@Test
public void testLocalDirs() throws Exception {
  Configuration conf = new Configuration();
  final String permStr = conf.get(
    DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY);
  FsPermission expected = new FsPermission(permStr);

  // Check permissions on directories in 'dfs.datanode.data.dir'
  FileSystem localFS = FileSystem.getLocal(conf);
  for (DataNode dn : cluster.getDataNodes()) {
    try (FsDatasetSpi.FsVolumeReferences volumes =
        dn.getFSDataset().getFsVolumeReferences()) {
      for (FsVolumeSpi vol : volumes) {
        String dir = vol.getBasePath();
        Path dataDir = new Path(dir);
        FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
        assertEquals("Permission for dir: " + dataDir + ", is " + actual +
            ", while expected is " + expected, expected, actual);
      }
    }
  }
}
项目:big-c    文件:DirectoryScanner.java   
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
  this.blockId = blockId;
  String condensedVolPath = vol == null ? null :
    getCondensedPath(vol.getBasePath());
  this.blockSuffix = blockFile == null ? null :
    getSuffix(blockFile, condensedVolPath);
  this.blockFileLength = (blockFile != null) ? blockFile.length() : 0; 
  if (metaFile == null) {
    this.metaSuffix = null;
  } else if (blockFile == null) {
    this.metaSuffix = getSuffix(metaFile, condensedVolPath);
  } else {
    this.metaSuffix = getSuffix(metaFile,
        condensedVolPath + blockSuffix);
  }
  this.volume = vol;
}
项目:big-c    文件:BlockScanner.java   
/**
 * Stops and removes a volume scanner.<p/>
 *
 * This function will block until the volume scanner has stopped.
 *
 * @param volume           The volume to remove.
 */
public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
  if (!isEnabled()) {
    LOG.debug("Not removing volume scanner for {}, because the block " +
        "scanner is disabled.", volume.getStorageID());
    return;
  }
  VolumeScanner scanner = scanners.get(volume.getStorageID());
  if (scanner == null) {
    LOG.warn("No scanner found to remove for volumeId {}",
        volume.getStorageID());
    return;
  }
  LOG.info("Removing scanner for volume {} (StorageID {})",
      volume.getBasePath(), volume.getStorageID());
  scanner.shutdown();
  scanners.remove(volume.getStorageID());
  Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
}
项目:big-c    文件:TestDiskError.java   
/**
 * Check that the permissions of the local DN directories are as expected.
 */
@Test
public void testLocalDirs() throws Exception {
  Configuration conf = new Configuration();
  final String permStr = conf.get(
    DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY);
  FsPermission expected = new FsPermission(permStr);

  // Check permissions on directories in 'dfs.datanode.data.dir'
  FileSystem localFS = FileSystem.getLocal(conf);
  for (DataNode dn : cluster.getDataNodes()) {
    for (FsVolumeSpi v : dn.getFSDataset().getVolumes()) {
      String dir = v.getBasePath();
      Path dataDir = new Path(dir);
      FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
        assertEquals("Permission for dir: " + dataDir + ", is " + actual +
            ", while expected is " + expected, expected, actual);
    }
  }
}
项目:hadoop    文件:FsVolumeList.java   
long getRemaining() throws IOException {
  long remaining = 0L;
  for (FsVolumeSpi vol : volumes.get()) {
    try (FsVolumeReference ref = vol.obtainReference()) {
      remaining += vol.getAvailable();
    } catch (ClosedChannelException e) {
      // ignore
    }
  }
  return remaining;
}
项目:hadoop    文件:DirectoryScanner.java   
/** Block is not found on the disk */
private void addDifference(LinkedList<ScanInfo> diffRecord,
                           Stats statsRecord, long blockId,
                           FsVolumeSpi vol) {
  statsRecord.missingBlockFile++;
  statsRecord.missingMetaFile++;
  diffRecord.add(new ScanInfo(blockId, null, null, vol));
}
项目:hadoop    文件:DirectoryScanner.java   
/** Is the given volume still valid in the dataset? */
private static boolean isValid(final FsDatasetSpi<?> dataset,
    final FsVolumeSpi volume) {
  for (FsVolumeSpi vol : dataset.getVolumes()) {
    if (vol == volume) {
      return true;
    }
  }
  return false;
}
项目:hadoop    文件:DataNode.java   
/**
 * Report a bad block which is hosted on the local DN.
 */
public void reportBadBlocks(ExtendedBlock block) throws IOException{
  BPOfferService bpos = getBPOSForBlock(block);
  FsVolumeSpi volume = getFSDataset().getVolume(block);
  bpos.reportBadBlocks(
      block, volume.getStorageID(), volume.getStorageType());
}
项目:hadoop    文件:DataNode.java   
private void reportBadBlock(final BPOfferService bpos,
    final ExtendedBlock block, final String msg) {
  FsVolumeSpi volume = getFSDataset().getVolume(block);
  bpos.reportBadBlocks(
      block, volume.getStorageID(), volume.getStorageType());
  LOG.warn(msg);
}
项目:hadoop    文件:BlockScanner.java   
/**
* Set up a scanner for the given block pool and volume.
*
* @param ref              A reference to the volume.
*/
public synchronized void addVolumeScanner(FsVolumeReference ref) {
  boolean success = false;
  try {
    FsVolumeSpi volume = ref.getVolume();
    if (!isEnabled()) {
      LOG.debug("Not adding volume scanner for {}, because the block " +
          "scanner is disabled.", volume.getBasePath());
      return;
    }
    VolumeScanner scanner = scanners.get(volume.getStorageID());
    if (scanner != null) {
      LOG.error("Already have a scanner for volume {}.",
          volume.getBasePath());
      return;
    }
    LOG.debug("Adding scanner for volume {} (StorageID {})",
        volume.getBasePath(), volume.getStorageID());
    scanner = new VolumeScanner(conf, datanode, ref);
    scanner.start();
    scanners.put(volume.getStorageID(), scanner);
    success = true;
  } finally {
    if (!success) {
      // If we didn't create a new VolumeScanner object, we don't
      // need this reference to the volume.
      IOUtils.cleanup(null, ref);
    }
  }
}
项目:hadoop    文件:VolumeScanner.java   
public void handle(ExtendedBlock block, IOException e) {
  FsVolumeSpi volume = scanner.volume;
  if (e == null) {
    LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath());
    return;
  }
  // If the block does not exist anymore, then it's not an error.
  if (!volume.getDataset().contains(block)) {
    LOG.debug("Volume {}: block {} is no longer in the dataset.",
        volume.getBasePath(), block);
    return;
  }
  // If the block exists, the exception may due to a race with write:
  // The BlockSender got an old block path in rbw. BlockReceiver removed
  // the rbw block from rbw to finalized but BlockSender tried to open the
  // file before BlockReceiver updated the VolumeMap. The state of the
  // block can be changed again now, so ignore this error here. If there
  // is a block really deleted by mistake, DirectoryScan should catch it.
  if (e instanceof FileNotFoundException ) {
    LOG.info("Volume {}: verification failed for {} because of " +
            "FileNotFoundException.  This may be due to a race with write.",
        volume.getBasePath(), block);
    return;
  }
  LOG.warn("Reporting bad {} on {}", block, volume.getBasePath());
  try {
    scanner.datanode.reportBadBlocks(block);
  } catch (IOException ie) {
    // This is bad, but not bad enough to shut down the scanner.
    LOG.warn("Cannot report bad " + block.getBlockId(), e);
  }
}
项目:hadoop    文件:TestStorageMover.java   
private void setVolumeFull(DataNode dn, StorageType type) {
  List<? extends FsVolumeSpi> volumes = dn.getFSDataset().getVolumes();
  for (FsVolumeSpi v : volumes) {
    FsVolumeImpl volume = (FsVolumeImpl) v;
    if (volume.getStorageType() == type) {
      LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]"
          + volume.getStorageID());
      volume.setCapacityForTesting(0);
    }
  }
}
项目:hadoop    文件:LazyPersistTestCase.java   
/**
 * Make sure at least one non-transient volume has a saved copy of the replica.
 * An infinite loop is used to ensure the async lazy persist tasks are completely
 * done before verification. Caller of ensureLazyPersistBlocksAreSaved expects
 * either a successful pass or timeout failure.
 */
protected final void ensureLazyPersistBlocksAreSaved(
    LocatedBlocks locatedBlocks) throws IOException, InterruptedException {
  final String bpid = cluster.getNamesystem().getBlockPoolId();
  List<? extends FsVolumeSpi> volumes =
    cluster.getDataNodes().get(0).getFSDataset().getVolumes();
  final Set<Long> persistedBlockIds = new HashSet<Long>();

  while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks().size()) {
    // Take 1 second sleep before each verification iteration
    Thread.sleep(1000);

    for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
      for (FsVolumeSpi v : volumes) {
        if (v.isTransientStorage()) {
          continue;
        }

        FsVolumeImpl volume = (FsVolumeImpl) v;
        File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();

        long blockId = lb.getBlock().getBlockId();
        File targetDir =
          DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
        File blockFile = new File(targetDir, lb.getBlock().getBlockName());
        if (blockFile.exists()) {
          // Found a persisted copy for this block and added to the Set
          persistedBlockIds.add(blockId);
        }
      }
    }
  }

  // We should have found a persisted copy for each located block.
  assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
}
项目:hadoop    文件:LazyPersistTestCase.java   
protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
    throws IOException, InterruptedException {

  LOG.info("Verifying replica has no saved copy after deletion.");
  triggerBlockReport();

  while(
    DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
      > 0L){
    Thread.sleep(1000);
  }

  final String bpid = cluster.getNamesystem().getBlockPoolId();
  List<? extends FsVolumeSpi> volumes =
    cluster.getDataNodes().get(0).getFSDataset().getVolumes();

  // Make sure deleted replica does not have a copy on either finalized dir of
  // transient volume or finalized dir of non-transient volume
  for (FsVolumeSpi v : volumes) {
    FsVolumeImpl volume = (FsVolumeImpl) v;
    File targetDir = (v.isTransientStorage()) ?
        volume.getBlockPoolSlice(bpid).getFinalizedDir() :
        volume.getBlockPoolSlice(bpid).getLazypersistDir();
    if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
      return false;
    }
  }
  return true;
}
项目:hadoop    文件:TestRbwSpaceReservation.java   
/**
 *
 * @param blockSize
 * @param perVolumeCapacity limit the capacity of each volume to the given
 *                          value. If negative, then don't limit.
 * @throws IOException
 */
private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
  initConfig(blockSize);

  cluster = new MiniDFSCluster
      .Builder(conf)
      .storagesPerDatanode(STORAGES_PER_DATANODE)
      .numDataNodes(numDatanodes)
      .build();
  fs = cluster.getFileSystem();
  client = fs.getClient();
  cluster.waitActive();

  if (perVolumeCapacity >= 0) {
    for (DataNode dn : cluster.getDataNodes()) {
      for (FsVolumeSpi volume : dn.getFSDataset().getVolumes()) {
        ((FsVolumeImpl) volume).setCapacityForTesting(perVolumeCapacity);
      }
    }
  }

  if (numDatanodes == 1) {
    List<? extends FsVolumeSpi> volumes =
        cluster.getDataNodes().get(0).getFSDataset().getVolumes();
    assertThat(volumes.size(), is(1));
    singletonVolume = ((FsVolumeImpl) volumes.get(0));
  }
}
项目:hadoop    文件:TestDataNodeHotSwapVolumes.java   
/** Get the FsVolume on the given basePath */
private FsVolumeImpl getVolume(DataNode dn, File basePath) {
  for (FsVolumeSpi vol : dn.getFSDataset().getVolumes()) {
    if (vol.getBasePath().equals(basePath.getPath())) {
      return (FsVolumeImpl)vol;
    }
  }
  return null;
}
项目:hadoop    文件:TestDirectoryScanner.java   
/**
 * Duplicate the given block on all volumes.
 * @param blockId
 * @throws IOException
 */
private void duplicateBlock(long blockId) throws IOException {
  synchronized (fds) {
    ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
    for (FsVolumeSpi v : fds.getVolumes()) {
      if (v.getStorageID().equals(b.getVolume().getStorageID())) {
        continue;
      }

      // Volume without a copy of the block. Make a copy now.
      File sourceBlock = b.getBlockFile();
      File sourceMeta = b.getMetaFile();
      String sourceRoot = b.getVolume().getBasePath();
      String destRoot = v.getBasePath();

      String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath();
      String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath();

      File destBlock = new File(destRoot, relativeBlockPath);
      File destMeta = new File(destRoot, relativeMetaPath);

      destBlock.getParentFile().mkdirs();
      FileUtils.copyFile(sourceBlock, destBlock);
      FileUtils.copyFile(sourceMeta, destMeta);

      if (destBlock.exists() && destMeta.exists()) {
        LOG.info("Copied " + sourceBlock + " ==> " + destBlock);
        LOG.info("Copied " + sourceMeta + " ==> " + destMeta);
      }
    }
  }
}
项目:hadoop    文件:TestDirectoryScanner.java   
/** Create a block file in a random volume*/
private long createBlockFile() throws IOException {
  List<? extends FsVolumeSpi> volumes = fds.getVolumes();
  int index = rand.nextInt(volumes.size() - 1);
  long id = getFreeBlockId();
  File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
  File file = new File(finalizedDir, getBlockFile(id));
  if (file.createNewFile()) {
    LOG.info("Created block file " + file.getName());
  }
  return id;
}
项目:hadoop    文件:TestDirectoryScanner.java   
/** Create a metafile in a random volume*/
private long createMetaFile() throws IOException {
  List<? extends FsVolumeSpi> volumes = fds.getVolumes();
  int index = rand.nextInt(volumes.size() - 1);
  long id = getFreeBlockId();
  File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
  File file = new File(finalizedDir, getMetaFile(id));
  if (file.createNewFile()) {
    LOG.info("Created metafile " + file.getName());
  }
  return id;
}
项目:aliyun-oss-hadoop-fs    文件:FsVolumeList.java   
long getRemaining() throws IOException {
  long remaining = 0L;
  for (FsVolumeSpi vol : volumes) {
    try (FsVolumeReference ref = vol.obtainReference()) {
      remaining += vol.getAvailable();
    } catch (ClosedChannelException e) {
      // ignore
    }
  }
  return remaining;
}
项目:big-c    文件:TestDirectoryScanner.java   
/** Create a block file in a random volume*/
private long createBlockFile() throws IOException {
  List<? extends FsVolumeSpi> volumes = fds.getVolumes();
  int index = rand.nextInt(volumes.size() - 1);
  long id = getFreeBlockId();
  File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
  File file = new File(finalizedDir, getBlockFile(id));
  if (file.createNewFile()) {
    LOG.info("Created block file " + file.getName());
  }
  return id;
}
项目:aliyun-oss-hadoop-fs    文件:DataNode.java   
/**
 * Report a bad block which is hosted on the local DN.
 */
public void reportBadBlocks(ExtendedBlock block) throws IOException{
  BPOfferService bpos = getBPOSForBlock(block);
  FsVolumeSpi volume = getFSDataset().getVolume(block);
  bpos.reportBadBlocks(
      block, volume.getStorageID(), volume.getStorageType());
}
项目:aliyun-oss-hadoop-fs    文件:DataNode.java   
private void reportBadBlock(final BPOfferService bpos,
    final ExtendedBlock block, final String msg) {
  FsVolumeSpi volume = getFSDataset().getVolume(block);
  bpos.reportBadBlocks(
      block, volume.getStorageID(), volume.getStorageType());
  LOG.warn(msg);
}
项目:aliyun-oss-hadoop-fs    文件:BlockScanner.java   
/**
* Set up a scanner for the given block pool and volume.
*
* @param ref              A reference to the volume.
*/
public synchronized void addVolumeScanner(FsVolumeReference ref) {
  boolean success = false;
  try {
    FsVolumeSpi volume = ref.getVolume();
    if (!isEnabled()) {
      LOG.debug("Not adding volume scanner for {}, because the block " +
          "scanner is disabled.", volume.getBasePath());
      return;
    }
    VolumeScanner scanner = scanners.get(volume.getStorageID());
    if (scanner != null) {
      LOG.error("Already have a scanner for volume {}.",
          volume.getBasePath());
      return;
    }
    LOG.debug("Adding scanner for volume {} (StorageID {})",
        volume.getBasePath(), volume.getStorageID());
    scanner = new VolumeScanner(conf, datanode, ref);
    scanner.start();
    scanners.put(volume.getStorageID(), scanner);
    success = true;
  } finally {
    if (!success) {
      // If we didn't create a new VolumeScanner object, we don't
      // need this reference to the volume.
      IOUtils.cleanup(null, ref);
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:VolumeScanner.java   
public void handle(ExtendedBlock block, IOException e) {
  FsVolumeSpi volume = scanner.volume;
  if (e == null) {
    LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath());
    return;
  }
  // If the block does not exist anymore, then it's not an error.
  if (!volume.getDataset().contains(block)) {
    LOG.debug("Volume {}: block {} is no longer in the dataset.",
        volume.getBasePath(), block);
    return;
  }
  // If the block exists, the exception may due to a race with write:
  // The BlockSender got an old block path in rbw. BlockReceiver removed
  // the rbw block from rbw to finalized but BlockSender tried to open the
  // file before BlockReceiver updated the VolumeMap. The state of the
  // block can be changed again now, so ignore this error here. If there
  // is a block really deleted by mistake, DirectoryScan should catch it.
  if (e instanceof FileNotFoundException ) {
    LOG.info("Volume {}: verification failed for {} because of " +
            "FileNotFoundException.  This may be due to a race with write.",
        volume.getBasePath(), block);
    return;
  }
  LOG.warn("Reporting bad {} on {}", block, volume.getBasePath());
  try {
    scanner.datanode.reportBadBlocks(block);
  } catch (IOException ie) {
    // This is bad, but not bad enough to shut down the scanner.
    LOG.warn("Cannot report bad " + block.getBlockId(), e);
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestStorageMover.java   
private void setVolumeFull(DataNode dn, StorageType type) {
  try (FsDatasetSpi.FsVolumeReferences refs = dn.getFSDataset()
      .getFsVolumeReferences()) {
    for (FsVolumeSpi fvs : refs) {
      FsVolumeImpl volume = (FsVolumeImpl) fvs;
      if (volume.getStorageType() == type) {
        LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]"
            + volume.getStorageID());
        volume.setCapacityForTesting(0);
      }
    }
  } catch (IOException e) {
    LOG.error("Unexpected exception by closing FsVolumeReference", e);
  }
}
项目:aliyun-oss-hadoop-fs    文件:LazyPersistTestCase.java   
/**
 * Make sure at least one non-transient volume has a saved copy of the replica.
 * An infinite loop is used to ensure the async lazy persist tasks are completely
 * done before verification. Caller of ensureLazyPersistBlocksAreSaved expects
 * either a successful pass or timeout failure.
 */
protected final void ensureLazyPersistBlocksAreSaved(
    LocatedBlocks locatedBlocks) throws IOException, InterruptedException {
  final String bpid = cluster.getNamesystem().getBlockPoolId();

  final Set<Long> persistedBlockIds = new HashSet<Long>();

  try (FsDatasetSpi.FsVolumeReferences volumes =
      cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
    while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks()
        .size()) {
      // Take 1 second sleep before each verification iteration
      Thread.sleep(1000);

      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
        for (FsVolumeSpi v : volumes) {
          if (v.isTransientStorage()) {
            continue;
          }

          FsVolumeImpl volume = (FsVolumeImpl) v;
          File lazyPersistDir =
              volume.getBlockPoolSlice(bpid).getLazypersistDir();

          long blockId = lb.getBlock().getBlockId();
          File targetDir =
              DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
          File blockFile = new File(targetDir, lb.getBlock().getBlockName());
          if (blockFile.exists()) {
            // Found a persisted copy for this block and added to the Set
            persistedBlockIds.add(blockId);
          }
        }
      }
    }
  }

  // We should have found a persisted copy for each located block.
  assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
}
项目:aliyun-oss-hadoop-fs    文件:LazyPersistTestCase.java   
protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
    throws IOException, InterruptedException {

  LOG.info("Verifying replica has no saved copy after deletion.");
  triggerBlockReport();

  while(
      cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions()
      > 0L){
    Thread.sleep(1000);
  }

  final String bpid = cluster.getNamesystem().getBlockPoolId();
  final FsDatasetSpi<?> dataset =
      cluster.getDataNodes().get(0).getFSDataset();

  // Make sure deleted replica does not have a copy on either finalized dir of
  // transient volume or finalized dir of non-transient volume
  try (FsDatasetSpi.FsVolumeReferences volumes =
      dataset.getFsVolumeReferences()) {
    for (FsVolumeSpi vol : volumes) {
      FsVolumeImpl volume = (FsVolumeImpl) vol;
      File targetDir = (volume.isTransientStorage()) ?
          volume.getBlockPoolSlice(bpid).getFinalizedDir() :
          volume.getBlockPoolSlice(bpid).getLazypersistDir();
      if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
        return false;
      }
    }
  }
  return true;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImplTestUtils.java   
@Override
public Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block)
    throws IOException {
  FsVolumeImpl vol = (FsVolumeImpl) volume;
  ReplicaInfo info = new FinalizedReplica(block.getLocalBlock(), vol,
      vol.getCurrentDir().getParentFile());
  dataset.volumeMap.add(block.getBlockPoolId(), info);
  info.getBlockFile().createNewFile();
  info.getMetaFile().createNewFile();
  return info;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImplTestUtils.java   
@Override
public Replica createReplicaInPipeline(
    FsVolumeSpi volume, ExtendedBlock block) throws IOException {
  FsVolumeImpl vol = (FsVolumeImpl) volume;
  ReplicaInPipeline rip = new ReplicaInPipeline(
      block.getBlockId(), block.getGenerationStamp(), volume,
      vol.createTmpFile(
          block.getBlockPoolId(), block.getLocalBlock()).getParentFile(),
      0);
  dataset.volumeMap.add(block.getBlockPoolId(), rip);
  return rip;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImplTestUtils.java   
@Override
public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb)
    throws IOException {
  FsVolumeImpl vol = (FsVolumeImpl) volume;
  final String bpid = eb.getBlockPoolId();
  final Block block = eb.getLocalBlock();
  ReplicaBeingWritten rbw = new ReplicaBeingWritten(
      eb.getLocalBlock(), volume,
      vol.createRbwFile(bpid, block).getParentFile(), null);
  rbw.getBlockFile().createNewFile();
  rbw.getMetaFile().createNewFile();
  dataset.volumeMap.add(bpid, rbw);
  return rbw;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImplTestUtils.java   
@Override
public Replica createReplicaWaitingToBeRecovered(
    FsVolumeSpi volume, ExtendedBlock eb) throws IOException {
  FsVolumeImpl vol = (FsVolumeImpl) volume;
  final String bpid = eb.getBlockPoolId();
  final Block block = eb.getLocalBlock();
  ReplicaWaitingToBeRecovered rwbr =
      new ReplicaWaitingToBeRecovered(eb.getLocalBlock(), volume,
          vol.createRbwFile(bpid, block).getParentFile());
  dataset.volumeMap.add(bpid, rwbr);
  return rwbr;
}
项目:big-c    文件:TestDirectoryScanner.java   
/** Create block file and corresponding metafile in a rondom volume */
private long createBlockMetaFile() throws IOException {
  List<? extends FsVolumeSpi> volumes = fds.getVolumes();
  int index = rand.nextInt(volumes.size() - 1);
  long id = getFreeBlockId();
  File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
  File file = new File(finalizedDir, getBlockFile(id));
  if (file.createNewFile()) {
    LOG.info("Created block file " + file.getName());

    // Create files with same prefix as block file but extension names
    // such that during sorting, these files appear around meta file
    // to test how DirectoryScanner handles extraneous files
    String name1 = file.getAbsolutePath() + ".l";
    String name2 = file.getAbsolutePath() + ".n";
    file = new File(name1);
    if (file.createNewFile()) {
      LOG.info("Created extraneous file " + name1);
    }

    file = new File(name2);
    if (file.createNewFile()) {
      LOG.info("Created extraneous file " + name2);
    }

    file = new File(finalizedDir, getMetaFile(id));
    if (file.createNewFile()) {
      LOG.info("Created metafile " + file.getName());
    }
  }
  return id;
}