ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) { this.blockId = blockId; String condensedVolPath = vol == null ? null : getCondensedPath(vol.getBasePath()); this.blockSuffix = blockFile == null ? null : getSuffix(blockFile, condensedVolPath); this.blockFileLength = (blockFile != null) ? blockFile.length() : 0; if (metaFile == null) { this.metaSuffix = null; } else if (blockFile == null) { this.metaSuffix = getSuffix(metaFile, condensedVolPath); } else { this.metaSuffix = getSuffix(metaFile, condensedVolPath + blockSuffix); } this.volume = vol; }
/** * Stops and removes a volume scanner.<p/> * * This function will block until the volume scanner has stopped. * * @param volume The volume to remove. */ public synchronized void removeVolumeScanner(FsVolumeSpi volume) { if (!isEnabled()) { LOG.debug("Not removing volume scanner for {}, because the block " + "scanner is disabled.", volume.getStorageID()); return; } VolumeScanner scanner = scanners.get(volume.getStorageID()); if (scanner == null) { LOG.warn("No scanner found to remove for volumeId {}", volume.getStorageID()); return; } LOG.info("Removing scanner for volume {} (StorageID {})", volume.getBasePath(), volume.getStorageID()); scanner.shutdown(); scanners.remove(volume.getStorageID()); Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES); }
/** * Check that the permissions of the local DN directories are as expected. */ @Test public void testLocalDirs() throws Exception { Configuration conf = new Configuration(); final String permStr = conf.get( DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY); FsPermission expected = new FsPermission(permStr); // Check permissions on directories in 'dfs.datanode.data.dir' FileSystem localFS = FileSystem.getLocal(conf); for (DataNode dn : cluster.getDataNodes()) { for (FsVolumeSpi v : dn.getFSDataset().getVolumes()) { String dir = v.getBasePath(); Path dataDir = new Path(dir); FsPermission actual = localFS.getFileStatus(dataDir).getPermission(); assertEquals("Permission for dir: " + dataDir + ", is " + actual + ", while expected is " + expected, expected, actual); } } }
/** * Create a ScanInfo object for a block. This constructor will examine * the block data and meta-data files. * * @param blockId the block ID * @param blockFile the path to the block data file * @param metaFile the path to the block meta-data file * @param vol the volume that contains the block */ ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) { this.blockId = blockId; String condensedVolPath = vol == null ? null : getCondensedPath(vol.getBasePath()); this.blockSuffix = blockFile == null ? null : getSuffix(blockFile, condensedVolPath); this.blockFileLength = (blockFile != null) ? blockFile.length() : 0; if (metaFile == null) { this.metaSuffix = null; } else if (blockFile == null) { this.metaSuffix = getSuffix(metaFile, condensedVolPath); } else { this.metaSuffix = getSuffix(metaFile, condensedVolPath + blockSuffix); } this.volume = vol; }
@Override public Iterator<Replica> getStoredReplicas(String bpid) throws IOException { // Reload replicas from the disk. ReplicaMap replicaMap = new ReplicaMap(dataset); try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) { for (FsVolumeSpi vol : refs) { FsVolumeImpl volume = (FsVolumeImpl) vol; volume.getVolumeMap(bpid, replicaMap, dataset.ramDiskReplicaTracker); } } // Cast ReplicaInfo to Replica, because ReplicaInfo assumes a file-based // FsVolumeSpi implementation. List<Replica> ret = new ArrayList<>(); if (replicaMap.replicas(bpid) != null) { ret.addAll(replicaMap.replicas(bpid)); } return ret.iterator(); }
private void createReplicas(List<String> bpList, List<FsVolumeSpi> volumes, FsDatasetTestUtils testUtils) throws IOException { // Here we create all different type of replicas and add it // to volume map. // Created all type of ReplicaInfo, each under Blkpool corresponding volume long id = 1; // This variable is used as both blockId and genStamp for (String bpId: bpList) { for (FsVolumeSpi volume: volumes) { ExtendedBlock eb = new ExtendedBlock(bpId, id, 1, id); testUtils.createFinalizedReplica(volume, eb); id++; eb = new ExtendedBlock(bpId, id, 1, id); testUtils.createRBW(volume, eb); id++; eb = new ExtendedBlock(bpId, id, 1, id); testUtils.createReplicaWaitingToBeRecovered(volume, eb); id++; eb = new ExtendedBlock(bpId, id, 1, id); testUtils.createReplicaInPipeline(volume, eb); id++; } } }
/** * Check that the permissions of the local DN directories are as expected. */ @Test public void testLocalDirs() throws Exception { Configuration conf = new Configuration(); final String permStr = conf.get( DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY); FsPermission expected = new FsPermission(permStr); // Check permissions on directories in 'dfs.datanode.data.dir' FileSystem localFS = FileSystem.getLocal(conf); for (DataNode dn : cluster.getDataNodes()) { try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset().getFsVolumeReferences()) { for (FsVolumeSpi vol : volumes) { String dir = vol.getBasePath(); Path dataDir = new Path(dir); FsPermission actual = localFS.getFileStatus(dataDir).getPermission(); assertEquals("Permission for dir: " + dataDir + ", is " + actual + ", while expected is " + expected, expected, actual); } } } }
long getRemaining() throws IOException { long remaining = 0L; for (FsVolumeSpi vol : volumes.get()) { try (FsVolumeReference ref = vol.obtainReference()) { remaining += vol.getAvailable(); } catch (ClosedChannelException e) { // ignore } } return remaining; }
/** Block is not found on the disk */ private void addDifference(LinkedList<ScanInfo> diffRecord, Stats statsRecord, long blockId, FsVolumeSpi vol) { statsRecord.missingBlockFile++; statsRecord.missingMetaFile++; diffRecord.add(new ScanInfo(blockId, null, null, vol)); }
/** Is the given volume still valid in the dataset? */ private static boolean isValid(final FsDatasetSpi<?> dataset, final FsVolumeSpi volume) { for (FsVolumeSpi vol : dataset.getVolumes()) { if (vol == volume) { return true; } } return false; }
/** * Report a bad block which is hosted on the local DN. */ public void reportBadBlocks(ExtendedBlock block) throws IOException{ BPOfferService bpos = getBPOSForBlock(block); FsVolumeSpi volume = getFSDataset().getVolume(block); bpos.reportBadBlocks( block, volume.getStorageID(), volume.getStorageType()); }
private void reportBadBlock(final BPOfferService bpos, final ExtendedBlock block, final String msg) { FsVolumeSpi volume = getFSDataset().getVolume(block); bpos.reportBadBlocks( block, volume.getStorageID(), volume.getStorageType()); LOG.warn(msg); }
/** * Set up a scanner for the given block pool and volume. * * @param ref A reference to the volume. */ public synchronized void addVolumeScanner(FsVolumeReference ref) { boolean success = false; try { FsVolumeSpi volume = ref.getVolume(); if (!isEnabled()) { LOG.debug("Not adding volume scanner for {}, because the block " + "scanner is disabled.", volume.getBasePath()); return; } VolumeScanner scanner = scanners.get(volume.getStorageID()); if (scanner != null) { LOG.error("Already have a scanner for volume {}.", volume.getBasePath()); return; } LOG.debug("Adding scanner for volume {} (StorageID {})", volume.getBasePath(), volume.getStorageID()); scanner = new VolumeScanner(conf, datanode, ref); scanner.start(); scanners.put(volume.getStorageID(), scanner); success = true; } finally { if (!success) { // If we didn't create a new VolumeScanner object, we don't // need this reference to the volume. IOUtils.cleanup(null, ref); } } }
public void handle(ExtendedBlock block, IOException e) { FsVolumeSpi volume = scanner.volume; if (e == null) { LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath()); return; } // If the block does not exist anymore, then it's not an error. if (!volume.getDataset().contains(block)) { LOG.debug("Volume {}: block {} is no longer in the dataset.", volume.getBasePath(), block); return; } // If the block exists, the exception may due to a race with write: // The BlockSender got an old block path in rbw. BlockReceiver removed // the rbw block from rbw to finalized but BlockSender tried to open the // file before BlockReceiver updated the VolumeMap. The state of the // block can be changed again now, so ignore this error here. If there // is a block really deleted by mistake, DirectoryScan should catch it. if (e instanceof FileNotFoundException ) { LOG.info("Volume {}: verification failed for {} because of " + "FileNotFoundException. This may be due to a race with write.", volume.getBasePath(), block); return; } LOG.warn("Reporting bad {} on {}", block, volume.getBasePath()); try { scanner.datanode.reportBadBlocks(block); } catch (IOException ie) { // This is bad, but not bad enough to shut down the scanner. LOG.warn("Cannot report bad " + block.getBlockId(), e); } }
private void setVolumeFull(DataNode dn, StorageType type) { List<? extends FsVolumeSpi> volumes = dn.getFSDataset().getVolumes(); for (FsVolumeSpi v : volumes) { FsVolumeImpl volume = (FsVolumeImpl) v; if (volume.getStorageType() == type) { LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]" + volume.getStorageID()); volume.setCapacityForTesting(0); } } }
/** * Make sure at least one non-transient volume has a saved copy of the replica. * An infinite loop is used to ensure the async lazy persist tasks are completely * done before verification. Caller of ensureLazyPersistBlocksAreSaved expects * either a successful pass or timeout failure. */ protected final void ensureLazyPersistBlocksAreSaved( LocatedBlocks locatedBlocks) throws IOException, InterruptedException { final String bpid = cluster.getNamesystem().getBlockPoolId(); List<? extends FsVolumeSpi> volumes = cluster.getDataNodes().get(0).getFSDataset().getVolumes(); final Set<Long> persistedBlockIds = new HashSet<Long>(); while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks().size()) { // Take 1 second sleep before each verification iteration Thread.sleep(1000); for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { for (FsVolumeSpi v : volumes) { if (v.isTransientStorage()) { continue; } FsVolumeImpl volume = (FsVolumeImpl) v; File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir(); long blockId = lb.getBlock().getBlockId(); File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, blockId); File blockFile = new File(targetDir, lb.getBlock().getBlockName()); if (blockFile.exists()) { // Found a persisted copy for this block and added to the Set persistedBlockIds.add(blockId); } } } } // We should have found a persisted copy for each located block. assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size())); }
protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks) throws IOException, InterruptedException { LOG.info("Verifying replica has no saved copy after deletion."); triggerBlockReport(); while( DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0)) > 0L){ Thread.sleep(1000); } final String bpid = cluster.getNamesystem().getBlockPoolId(); List<? extends FsVolumeSpi> volumes = cluster.getDataNodes().get(0).getFSDataset().getVolumes(); // Make sure deleted replica does not have a copy on either finalized dir of // transient volume or finalized dir of non-transient volume for (FsVolumeSpi v : volumes) { FsVolumeImpl volume = (FsVolumeImpl) v; File targetDir = (v.isTransientStorage()) ? volume.getBlockPoolSlice(bpid).getFinalizedDir() : volume.getBlockPoolSlice(bpid).getLazypersistDir(); if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) { return false; } } return true; }
/** * * @param blockSize * @param perVolumeCapacity limit the capacity of each volume to the given * value. If negative, then don't limit. * @throws IOException */ private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException { initConfig(blockSize); cluster = new MiniDFSCluster .Builder(conf) .storagesPerDatanode(STORAGES_PER_DATANODE) .numDataNodes(numDatanodes) .build(); fs = cluster.getFileSystem(); client = fs.getClient(); cluster.waitActive(); if (perVolumeCapacity >= 0) { for (DataNode dn : cluster.getDataNodes()) { for (FsVolumeSpi volume : dn.getFSDataset().getVolumes()) { ((FsVolumeImpl) volume).setCapacityForTesting(perVolumeCapacity); } } } if (numDatanodes == 1) { List<? extends FsVolumeSpi> volumes = cluster.getDataNodes().get(0).getFSDataset().getVolumes(); assertThat(volumes.size(), is(1)); singletonVolume = ((FsVolumeImpl) volumes.get(0)); } }
/** Get the FsVolume on the given basePath */ private FsVolumeImpl getVolume(DataNode dn, File basePath) { for (FsVolumeSpi vol : dn.getFSDataset().getVolumes()) { if (vol.getBasePath().equals(basePath.getPath())) { return (FsVolumeImpl)vol; } } return null; }
/** * Duplicate the given block on all volumes. * @param blockId * @throws IOException */ private void duplicateBlock(long blockId) throws IOException { synchronized (fds) { ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); for (FsVolumeSpi v : fds.getVolumes()) { if (v.getStorageID().equals(b.getVolume().getStorageID())) { continue; } // Volume without a copy of the block. Make a copy now. File sourceBlock = b.getBlockFile(); File sourceMeta = b.getMetaFile(); String sourceRoot = b.getVolume().getBasePath(); String destRoot = v.getBasePath(); String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath(); String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath(); File destBlock = new File(destRoot, relativeBlockPath); File destMeta = new File(destRoot, relativeMetaPath); destBlock.getParentFile().mkdirs(); FileUtils.copyFile(sourceBlock, destBlock); FileUtils.copyFile(sourceMeta, destMeta); if (destBlock.exists() && destMeta.exists()) { LOG.info("Copied " + sourceBlock + " ==> " + destBlock); LOG.info("Copied " + sourceMeta + " ==> " + destMeta); } } } }
/** Create a block file in a random volume*/ private long createBlockFile() throws IOException { List<? extends FsVolumeSpi> volumes = fds.getVolumes(); int index = rand.nextInt(volumes.size() - 1); long id = getFreeBlockId(); File finalizedDir = volumes.get(index).getFinalizedDir(bpid); File file = new File(finalizedDir, getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); } return id; }
/** Create a metafile in a random volume*/ private long createMetaFile() throws IOException { List<? extends FsVolumeSpi> volumes = fds.getVolumes(); int index = rand.nextInt(volumes.size() - 1); long id = getFreeBlockId(); File finalizedDir = volumes.get(index).getFinalizedDir(bpid); File file = new File(finalizedDir, getMetaFile(id)); if (file.createNewFile()) { LOG.info("Created metafile " + file.getName()); } return id; }
long getRemaining() throws IOException { long remaining = 0L; for (FsVolumeSpi vol : volumes) { try (FsVolumeReference ref = vol.obtainReference()) { remaining += vol.getAvailable(); } catch (ClosedChannelException e) { // ignore } } return remaining; }
private void setVolumeFull(DataNode dn, StorageType type) { try (FsDatasetSpi.FsVolumeReferences refs = dn.getFSDataset() .getFsVolumeReferences()) { for (FsVolumeSpi fvs : refs) { FsVolumeImpl volume = (FsVolumeImpl) fvs; if (volume.getStorageType() == type) { LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]" + volume.getStorageID()); volume.setCapacityForTesting(0); } } } catch (IOException e) { LOG.error("Unexpected exception by closing FsVolumeReference", e); } }
/** * Make sure at least one non-transient volume has a saved copy of the replica. * An infinite loop is used to ensure the async lazy persist tasks are completely * done before verification. Caller of ensureLazyPersistBlocksAreSaved expects * either a successful pass or timeout failure. */ protected final void ensureLazyPersistBlocksAreSaved( LocatedBlocks locatedBlocks) throws IOException, InterruptedException { final String bpid = cluster.getNamesystem().getBlockPoolId(); final Set<Long> persistedBlockIds = new HashSet<Long>(); try (FsDatasetSpi.FsVolumeReferences volumes = cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) { while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks() .size()) { // Take 1 second sleep before each verification iteration Thread.sleep(1000); for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { for (FsVolumeSpi v : volumes) { if (v.isTransientStorage()) { continue; } FsVolumeImpl volume = (FsVolumeImpl) v; File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir(); long blockId = lb.getBlock().getBlockId(); File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, blockId); File blockFile = new File(targetDir, lb.getBlock().getBlockName()); if (blockFile.exists()) { // Found a persisted copy for this block and added to the Set persistedBlockIds.add(blockId); } } } } } // We should have found a persisted copy for each located block. assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size())); }
protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks) throws IOException, InterruptedException { LOG.info("Verifying replica has no saved copy after deletion."); triggerBlockReport(); while( cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions() > 0L){ Thread.sleep(1000); } final String bpid = cluster.getNamesystem().getBlockPoolId(); final FsDatasetSpi<?> dataset = cluster.getDataNodes().get(0).getFSDataset(); // Make sure deleted replica does not have a copy on either finalized dir of // transient volume or finalized dir of non-transient volume try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { for (FsVolumeSpi vol : volumes) { FsVolumeImpl volume = (FsVolumeImpl) vol; File targetDir = (volume.isTransientStorage()) ? volume.getBlockPoolSlice(bpid).getFinalizedDir() : volume.getBlockPoolSlice(bpid).getLazypersistDir(); if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) { return false; } } } return true; }
@Override public Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block) throws IOException { FsVolumeImpl vol = (FsVolumeImpl) volume; ReplicaInfo info = new FinalizedReplica(block.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); dataset.volumeMap.add(block.getBlockPoolId(), info); info.getBlockFile().createNewFile(); info.getMetaFile().createNewFile(); return info; }
@Override public Replica createReplicaInPipeline( FsVolumeSpi volume, ExtendedBlock block) throws IOException { FsVolumeImpl vol = (FsVolumeImpl) volume; ReplicaInPipeline rip = new ReplicaInPipeline( block.getBlockId(), block.getGenerationStamp(), volume, vol.createTmpFile( block.getBlockPoolId(), block.getLocalBlock()).getParentFile(), 0); dataset.volumeMap.add(block.getBlockPoolId(), rip); return rip; }
@Override public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb) throws IOException { FsVolumeImpl vol = (FsVolumeImpl) volume; final String bpid = eb.getBlockPoolId(); final Block block = eb.getLocalBlock(); ReplicaBeingWritten rbw = new ReplicaBeingWritten( eb.getLocalBlock(), volume, vol.createRbwFile(bpid, block).getParentFile(), null); rbw.getBlockFile().createNewFile(); rbw.getMetaFile().createNewFile(); dataset.volumeMap.add(bpid, rbw); return rbw; }
@Override public Replica createReplicaWaitingToBeRecovered( FsVolumeSpi volume, ExtendedBlock eb) throws IOException { FsVolumeImpl vol = (FsVolumeImpl) volume; final String bpid = eb.getBlockPoolId(); final Block block = eb.getLocalBlock(); ReplicaWaitingToBeRecovered rwbr = new ReplicaWaitingToBeRecovered(eb.getLocalBlock(), volume, vol.createRbwFile(bpid, block).getParentFile()); dataset.volumeMap.add(bpid, rwbr); return rwbr; }
/** Create block file and corresponding metafile in a rondom volume */ private long createBlockMetaFile() throws IOException { List<? extends FsVolumeSpi> volumes = fds.getVolumes(); int index = rand.nextInt(volumes.size() - 1); long id = getFreeBlockId(); File finalizedDir = volumes.get(index).getFinalizedDir(bpid); File file = new File(finalizedDir, getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); // Create files with same prefix as block file but extension names // such that during sorting, these files appear around meta file // to test how DirectoryScanner handles extraneous files String name1 = file.getAbsolutePath() + ".l"; String name2 = file.getAbsolutePath() + ".n"; file = new File(name1); if (file.createNewFile()) { LOG.info("Created extraneous file " + name1); } file = new File(name2); if (file.createNewFile()) { LOG.info("Created extraneous file " + name2); } file = new File(finalizedDir, getMetaFile(id)); if (file.createNewFile()) { LOG.info("Created metafile " + file.getName()); } } return id; }