@Override // FsDatasetSpi public StorageReport[] getStorageReports(String bpid) throws IOException { List<StorageReport> reports; synchronized (statsLock) { List<FsVolumeImpl> curVolumes = getVolumes(); reports = new ArrayList<>(curVolumes.size()); for (FsVolumeImpl volume : curVolumes) { try (FsVolumeReference ref = volume.obtainReference()) { StorageReport sr = new StorageReport(volume.toDatanodeStorage(), false, volume.getCapacity(), volume.getDfsUsed(), volume.getAvailable(), volume.getBlockPoolUsed(bpid)); reports.add(sr); } catch (ClosedChannelException e) { continue; } } } return reports.toArray(new StorageReport[reports.size()]); }
private boolean transientFreeSpaceBelowThreshold() throws IOException { long free = 0; long capacity = 0; float percentFree = 0.0f; // Don't worry about fragmentation for now. We don't expect more than one // transient volume per DN. for (FsVolumeImpl v : getVolumes()) { try (FsVolumeReference ref = v.obtainReference()) { if (v.isTransientStorage()) { capacity += v.getCapacity(); free += v.getAvailable(); } } catch (ClosedChannelException e) { // ignore. } } if (capacity == 0) { return false; } percentFree = (float) ((double)free * 100 / capacity); return (percentFree < lowWatermarkFreeSpacePercentage) || (free < lowWatermarkFreeSpaceBytes); }
/** * Asynchronously lazy persist the block from the RamDisk to Disk. */ void submitLazyPersistTask(String bpId, long blockId, long genStamp, long creationTime, File metaFile, File blockFile, FsVolumeReference target) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: " + bpId + " block id: " + blockId); } FsVolumeImpl volume = (FsVolumeImpl)target.getVolume(); File lazyPersistDir = volume.getLazyPersistDir(bpId); if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) { FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir); throw new IOException("LazyWriter fail to find or create lazy persist dir: " + lazyPersistDir.toString()); } ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask( bpId, blockId, genStamp, creationTime, blockFile, metaFile, target, lazyPersistDir); execute(volume.getCurrentDir(), lazyPersistTask); }
VolumeScanner(Conf conf, DataNode datanode, FsVolumeReference ref) { this.conf = conf; this.datanode = datanode; this.ref = ref; this.volume = ref.getVolume(); ScanResultHandler handler; try { handler = conf.resultHandler.newInstance(); } catch (Throwable e) { LOG.error("unable to instantiate {}", conf.resultHandler, e); handler = new ScanResultHandler(); } this.resultHandler = handler; setName("VolumeScannerThread(" + volume.getBasePath() + ")"); setDaemon(true); }
@Test public void testGetNextVolumeWithClosedVolume() throws IOException { FsVolumeList volumeList = new FsVolumeList( Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser); List<FsVolumeImpl> volumes = new ArrayList<>(); for (int i = 0; i < 3; i++) { File curDir = new File(baseDir, "nextvolume-" + i); curDir.mkdirs(); FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir, conf, StorageType.DEFAULT); volume.setCapacityForTesting(1024 * 1024 * 1024); volumes.add(volume); volumeList.addVolume(volume.obtainReference()); } // Close the second volume. volumes.get(1).closeAndWait(); for (int i = 0; i < 10; i++) { try (FsVolumeReference ref = volumeList.getNextVolume(StorageType.DEFAULT, 128)) { // volume No.2 will not be chosen. assertNotEquals(ref.getVolume(), volumes.get(1)); } } }
@Test public void testReleaseVolumeRefIfNoBlockScanner() throws IOException { FsVolumeList volumeList = new FsVolumeList( Collections.<VolumeFailureInfo>emptyList(), null, blockChooser); File volDir = new File(baseDir, "volume-0"); volDir.mkdirs(); FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir, conf, StorageType.DEFAULT); FsVolumeReference ref = volume.obtainReference(); volumeList.addVolume(ref); try { ref.close(); fail("Should throw exception because the reference is closed in " + "VolumeList#addVolume()."); } catch (IllegalStateException e) { } }
@Override // FsDatasetSpi public StorageReport[] getStorageReports(String bpid) throws IOException { List<StorageReport> reports; synchronized (statsLock) { List<FsVolumeImpl> curVolumes = volumes.getVolumes(); reports = new ArrayList<>(curVolumes.size()); for (FsVolumeImpl volume : curVolumes) { try (FsVolumeReference ref = volume.obtainReference()) { StorageReport sr = new StorageReport(volume.toDatanodeStorage(), false, volume.getCapacity(), volume.getDfsUsed(), volume.getAvailable(), volume.getBlockPoolUsed(bpid)); reports.add(sr); } catch (ClosedChannelException e) { continue; } } } return reports.toArray(new StorageReport[reports.size()]); }
@Override public void run() { boolean succeeded = false; final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset(); try (FsVolumeReference ref = this.targetVolume) { int smallBufferSize = DFSUtilClient.getSmallBufferSize(EMPTY_HDFS_CONF); // No FsDatasetImpl lock for the file copy File targetFiles[] = FsDatasetImpl.copyBlockFiles( blockId, genStamp, metaFile, blockFile, lazyPersistDir, true, smallBufferSize, conf); // Lock FsDataSetImpl during onCompleteLazyPersist callback dataset.onCompleteLazyPersist(bpId, blockId, creationTime, targetFiles, (FsVolumeImpl)ref.getVolume()); succeeded = true; } catch (Exception e){ FsDatasetImpl.LOG.warn( "LazyWriter failed to async persist RamDisk block pool id: " + bpId + "block Id: " + blockId, e); } finally { if (!succeeded) { dataset.onFailLazyPersist(bpId, blockId); } } }
/** * Dynamically add new volumes to the existing volumes that this DN manages. * * @param ref a reference to the new FsVolumeImpl instance. */ void addVolume(FsVolumeReference ref) { FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume(); volumes.add(volume); if (blockScanner != null) { blockScanner.addVolumeScanner(ref); } else { // If the volume is not put into a volume scanner, it does not need to // hold the reference. IOUtils.cleanup(FsDatasetImpl.LOG, ref); } // If the volume is used to replace a failed volume, it needs to reset the // volume failure info for this volume. removeVolumeFailureInfo(new File(volume.getBasePath())); FsDatasetImpl.LOG.info("Added new volume: " + volume.getStorageID()); }
/** * Returns handles to the block file and its metadata file */ @Override // FsDatasetSpi public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long ckoff) throws IOException { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { File blockFile = info.getBlockFile(); RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); if (blkOffset > 0) { blockInFile.seek(blkOffset); } File metaFile = info.getMetaFile(); RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r"); if (ckoff > 0) { metaInFile.seek(ckoff); } return new ReplicaInputStreams( blockInFile.getFD(), metaInFile.getFD(), ref); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } }
/** * Dynamically add new volumes to the existing volumes that this DN manages. * * @param ref a reference to the new FsVolumeImpl instance. */ void addVolume(FsVolumeReference ref) { while (true) { final FsVolumeImpl[] curVolumes = volumes.get(); final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); volumeList.add((FsVolumeImpl)ref.getVolume()); if (volumes.compareAndSet(curVolumes, volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { break; } else { if (FsDatasetImpl.LOG.isDebugEnabled()) { FsDatasetImpl.LOG.debug( "The volume list has been changed concurrently, " + "retry to remove volume: " + ref.getVolume().getStorageID()); } } } if (blockScanner != null) { blockScanner.addVolumeScanner(ref); } // If the volume is used to replace a failed volume, it needs to reset the // volume failure info for this volume. removeVolumeFailureInfo(new File(ref.getVolume().getBasePath())); FsDatasetImpl.LOG.info("Added new volume: " + ref.getVolume().getStorageID()); }
/** * Delete the block file and meta file from the disk asynchronously, adjust * dfsUsed statistics accordingly. */ void deleteAsync(FsVolumeReference volumeRef, File blockFile, File metaFile, ExtendedBlock block, String trashDirectory) { LOG.info("Scheduling " + block.getLocalBlock() + " file " + blockFile + " for deletion"); ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask( volumeRef, blockFile, metaFile, block, trashDirectory); execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask); }
ReplicaFileDeleteTask(FsVolumeReference volumeRef, File blockFile, File metaFile, ExtendedBlock block, String trashDirectory) { this.volumeRef = volumeRef; this.volume = (FsVolumeImpl) volumeRef.getVolume(); this.blockFile = blockFile; this.metaFile = metaFile; this.block = block; this.trashDirectory = trashDirectory; }
private void addVolume(Collection<StorageLocation> dataLocations, Storage.StorageDirectory sd) throws IOException { final File dir = sd.getCurrentDir(); final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot()); // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is // nothing needed to be rolled back to make various data structures, e.g., // storageMap and asyncDiskService, consistent. FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); FsVolumeReference ref = fsVolume.obtainReference(); ReplicaMap tempVolumeMap = new ReplicaMap(this); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); synchronized (this) { volumeMap.addAll(tempVolumeMap); storageMap.put(sd.getStorageUuid(), new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); asyncDiskService.addVolume(sd.getCurrentDir()); volumes.addVolume(ref); } LOG.info("Added volume - " + dir + ", StorageType: " + storageType); }
@Override // FsDatasetSpi public synchronized ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client // re-opens the connection and retries sending those packets. // The other reason is that an "append" is occurring to this block. // check the validity of the parameter if (newGS < b.getGenerationStamp()) { throw new IOException("The new generation stamp " + newGS + " should be greater than the replica " + b + "'s generation stamp"); } ReplicaInfo replicaInfo = getReplicaInfo(b); LOG.info("Appending to " + replicaInfo); if (replicaInfo.getState() != ReplicaState.FINALIZED) { throw new ReplicaNotFoundException( ReplicaNotFoundException.UNFINALIZED_REPLICA + b); } if (replicaInfo.getNumBytes() != expectedBlockLen) { throw new IOException("Corrupted replica " + replicaInfo + " with a length of " + replicaInfo.getNumBytes() + " expected length is " + expectedBlockLen); } FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); ReplicaBeingWritten replica = null; try { replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS, b.getNumBytes()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } return new ReplicaHandler(replica, ref); }
private File[] copyReplicaWithNewBlockIdAndGS( ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS) throws IOException { String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId; FsVolumeReference v = volumes.getNextVolume( replicaInfo.getVolume().getStorageType(), replicaInfo.getNumBytes()); final File tmpDir = ((FsVolumeImpl) v.getVolume()) .getBlockPoolSlice(bpid).getTmpDir(); final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId); final File dstBlockFile = new File(destDir, blockFileName); final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), dstMetaFile, dstBlockFile, true); }
ReplicaLazyPersistTask(String bpId, long blockId, long genStamp, long creationTime, File blockFile, File metaFile, FsVolumeReference targetVolume, File lazyPersistDir) { this.bpId = bpId; this.blockId = blockId; this.genStamp = genStamp; this.creationTime = creationTime; this.blockFile = blockFile; this.metaFile = metaFile; this.targetVolume = targetVolume; this.lazyPersistDir = lazyPersistDir; }
private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize) throws IOException { while (true) { FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize); try { return volume.obtainReference(); } catch (ClosedChannelException e) { FsDatasetImpl.LOG.warn("Chosen a closed volume: " + volume); // blockChooser.chooseVolume returns DiskOutOfSpaceException when the list // is empty, indicating that all volumes are closed. list.remove(volume); } } }
/** * Get next volume. * * @param blockSize free space needed on the volume * @param storageType the desired {@link StorageType} * @return next volume to store the block in. */ FsVolumeReference getNextVolume(StorageType storageType, long blockSize) throws IOException { // Get a snapshot of currently available volumes. final FsVolumeImpl[] curVolumes = volumes.get(); final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.length); for(FsVolumeImpl v : curVolumes) { if (v.getStorageType() == storageType) { list.add(v); } } return chooseVolume(list, blockSize); }
/** * Get next volume. * * @param blockSize free space needed on the volume * @return next volume to store the block in. */ FsVolumeReference getNextTransientVolume(long blockSize) throws IOException { // Get a snapshot of currently available volumes. final List<FsVolumeImpl> curVolumes = getVolumes(); final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.size()); for(FsVolumeImpl v : curVolumes) { if (v.isTransientStorage()) { list.add(v); } } return chooseVolume(list, blockSize); }
long getDfsUsed() throws IOException { long dfsUsed = 0L; for (FsVolumeImpl v : volumes.get()) { try(FsVolumeReference ref = v.obtainReference()) { dfsUsed += v.getDfsUsed(); } catch (ClosedChannelException e) { // ignore. } } return dfsUsed; }
long getBlockPoolUsed(String bpid) throws IOException { long dfsUsed = 0L; for (FsVolumeImpl v : volumes.get()) { try (FsVolumeReference ref = v.obtainReference()) { dfsUsed += v.getBlockPoolUsed(bpid); } catch (ClosedChannelException e) { // ignore. } } return dfsUsed; }
long getCapacity() { long capacity = 0L; for (FsVolumeImpl v : volumes.get()) { try (FsVolumeReference ref = v.obtainReference()) { capacity += v.getCapacity(); } catch (IOException e) { // ignore. } } return capacity; }
long getRemaining() throws IOException { long remaining = 0L; for (FsVolumeSpi vol : volumes.get()) { try (FsVolumeReference ref = vol.obtainReference()) { remaining += vol.getAvailable(); } catch (ClosedChannelException e) { // ignore } } return remaining; }
/** * Dynamically add new volumes to the existing volumes that this DN manages. * * @param ref a reference to the new FsVolumeImpl instance. */ void addVolume(FsVolumeReference ref) { while (true) { final FsVolumeImpl[] curVolumes = volumes.get(); final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); volumeList.add((FsVolumeImpl)ref.getVolume()); if (volumes.compareAndSet(curVolumes, volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { break; } else { if (FsDatasetImpl.LOG.isDebugEnabled()) { FsDatasetImpl.LOG.debug( "The volume list has been changed concurrently, " + "retry to remove volume: " + ref.getVolume().getStorageID()); } } } if (blockScanner != null) { blockScanner.addVolumeScanner(ref); } else { // If the volume is not put into a volume scanner, it does not need to // hold the reference. IOUtils.cleanup(FsDatasetImpl.LOG, ref); } // If the volume is used to replace a failed volume, it needs to reset the // volume failure info for this volume. removeVolumeFailureInfo(new File(ref.getVolume().getBasePath())); FsDatasetImpl.LOG.info("Added new volume: " + ref.getVolume().getStorageID()); }
/** * Set up a scanner for the given block pool and volume. * * @param ref A reference to the volume. */ public synchronized void addVolumeScanner(FsVolumeReference ref) { boolean success = false; try { FsVolumeSpi volume = ref.getVolume(); if (!isEnabled()) { LOG.debug("Not adding volume scanner for {}, because the block " + "scanner is disabled.", volume.getBasePath()); return; } VolumeScanner scanner = scanners.get(volume.getStorageID()); if (scanner != null) { LOG.error("Already have a scanner for volume {}.", volume.getBasePath()); return; } LOG.debug("Adding scanner for volume {} (StorageID {})", volume.getBasePath(), volume.getStorageID()); scanner = new VolumeScanner(conf, datanode, ref); scanner.start(); scanners.put(volume.getStorageID(), scanner); success = true; } finally { if (!success) { // If we didn't create a new VolumeScanner object, we don't // need this reference to the volume. IOUtils.cleanup(null, ref); } } }