/** * This should be primarily used for testing. * @return clone of replica store in datanode memory */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if(r == null) return null; switch(r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica)r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten)r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery)r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline)r); } return null; }
/** * Bump a replica's generation stamp to a new one. * Its on-disk meta file name is renamed to be the new one too. * * @param replicaInfo a replica * @param newGS new generation stamp * @throws IOException if rename fails */ private void bumpReplicaGS(ReplicaInfo replicaInfo, long newGS) throws IOException { long oldGS = replicaInfo.getGenerationStamp(); File oldmeta = replicaInfo.getMetaFile(); replicaInfo.setGenerationStamp(newGS); File newmeta = replicaInfo.getMetaFile(); // rename meta file to new GS if (LOG.isDebugEnabled()) { LOG.debug("Renaming " + oldmeta + " to " + newmeta); } try { NativeIO.renameTo(oldmeta, newmeta); } catch (IOException e) { replicaInfo.setGenerationStamp(oldGS); // restore old GS throw new IOException("Block " + replicaInfo + " reopen failed. " + " Unable to move meta file " + oldmeta + " to " + newmeta, e); } }
/** * Remove the temporary block file (if any) */ @Override // FsDatasetSpi public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { // remove from volumeMap volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); // delete the on-disk temp file if (delBlockFromDisk(replicaInfo.getBlockFile(), replicaInfo.getMetaFile(), b.getLocalBlock())) { LOG.warn("Block " + b + " unfinalized and removed. " ); } if (replicaInfo.getVolume().isTransientStorage()) { ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true); } } }
/** * Check if a block is valid. * * @param b The block to check. * @param minLength The minimum length that the block must have. May be 0. * @param state If this is null, it is ignored. If it is non-null, we * will check that the replica has this state. * * @throws ReplicaNotFoundException If the replica is not found * * @throws UnexpectedReplicaStateException If the replica is not in the * expected state. * @throws FileNotFoundException If the block file is not found or there * was an error locating it. * @throws EOFException If the replica length is too short. * * @throws IOException May be thrown from the methods called. */ public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state) throws ReplicaNotFoundException, UnexpectedReplicaStateException, FileNotFoundException, EOFException, IOException { final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo == null) { throw new ReplicaNotFoundException(b); } if (replicaInfo.getState() != state) { throw new UnexpectedReplicaStateException(b,state); } if (!replicaInfo.getBlockFile().exists()) { throw new FileNotFoundException(replicaInfo.getBlockFile().getPath()); } long onDiskLength = getLength(b); if (onDiskLength < minLength) { throw new EOFException(b + "'s on-disk length " + onDiskLength + " is shorter than minLength " + minLength); } }
/** Check the files of a replica. */ static void checkReplicaFiles(final ReplicaInfo r) throws IOException { //check replica's file final File f = r.getBlockFile(); if (!f.exists()) { throw new FileNotFoundException("File " + f + " not found, r=" + r); } if (r.getBytesOnDisk() != f.length()) { throw new IOException("File length mismatched. The length of " + f + " is " + f.length() + " but r=" + r); } //check replica's meta file final File metafile = FsDatasetUtil.getMetaFile(f, r.getGenerationStamp()); if (!metafile.exists()) { throw new IOException("Metafile " + metafile + " does not exist, r=" + r); } if (metafile.length() == 0) { throw new IOException("Metafile " + metafile + " is empty, r=" + r); } }
/** * Invalidate a block but does not delete the actual on-disk block file. * * It should only be used when deactivating disks. * * @param bpid the block pool ID. * @param block The block to be invalidated. */ public void invalidate(String bpid, ReplicaInfo block) { // If a DFSClient has the replica in its cache of short-circuit file // descriptors (and the client is using ShortCircuitShm), invalidate it. // The short-circuit registry is null in the unit tests, because the // datanode is mock object. if (datanode.getShortCircuitRegistry() != null) { datanode.getShortCircuitRegistry().processBlockInvalidation( new ExtendedBlockId(block.getBlockId(), bpid)); // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, block.getBlockId()); } datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block), block.getStorageUuid()); }
/** * Remove the replica's meta information from the map that matches * the input block's id and generation stamp * @param bpid block pool id * @param block block with its id as the key * @return the removed replica's meta information * @throws IllegalArgumentException if the input block is null */ ReplicaInfo remove(String bpid, Block block) { checkBlockPool(bpid); checkBlock(block); synchronized(mutex) { Map<Long, ReplicaInfo> m = map.get(bpid); if (m != null) { Long key = Long.valueOf(block.getBlockId()); ReplicaInfo replicaInfo = m.get(key); if (replicaInfo != null && block.getGenerationStamp() == replicaInfo.getGenerationStamp()) { return m.remove(key); } } } return null; }
private static void createUnlinkTmpFile(ReplicaInfo replicaInfo, boolean changeBlockFile, boolean isRename) throws IOException { File src; if (changeBlockFile) { src = replicaInfo.getBlockFile(); } else { src = replicaInfo.getMetaFile(); } File dst = DatanodeUtil.getUnlinkTmpFile(src); if (isRename) { src.renameTo(dst); } else { FileInputStream in = new FileInputStream(src); try { FileOutputStream out = new FileOutputStream(dst); try { IOUtils.copyBytes(in, out, 1); } finally { out.close(); } } finally { in.close(); } } }
/** * Remove the replica's meta information from the map that matches * the input block's id and generation stamp * @param bpid block pool id * @param block block with its id as the key * @return the removed replica's meta information * @throws IllegalArgumentException if the input block is null */ ReplicaInfo remove(String bpid, Block block) { checkBlockPool(bpid); checkBlock(block); synchronized(mutex) { LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid); if (m != null) { ReplicaInfo replicaInfo = m.get(block); if (replicaInfo != null && block.getGenerationStamp() == replicaInfo.getGenerationStamp()) { return m.remove(block); } } } return null; }
/** * Remove the temporary block file (if any) */ @Override // FsDatasetSpi public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { // remove from volumeMap volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); // delete the on-disk temp file if (delBlockFromDisk(replicaInfo.getBlockFile(), replicaInfo.getMetaFile(), b.getLocalBlock())) { LOG.warn("Block " + b + " unfinalized and removed. "); } } }
@Override // FsDatasetSpi public synchronized ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } // create a new block FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); // create a rbw file to hold block in the designated volume File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; }
/** * Returns handles to the block file and its metadata file */ @Override // FsDatasetSpi public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long ckoff) throws IOException { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { File blockFile = info.getBlockFile(); RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); if (blkOffset > 0) { blockInFile.seek(blkOffset); } File metaFile = info.getMetaFile(); RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r"); if (ckoff > 0) { metaInFile.seek(ckoff); } return new ReplicaInputStreams( blockInFile.getFD(), metaInFile.getFD(), ref); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } }
@Override // FsDatasetSpi public synchronized ReplicaInPipeline createTemporary(StorageType storageType, ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { if (replicaInfo.getGenerationStamp() < b.getGenerationStamp() && replicaInfo instanceof ReplicaInPipeline) { // Stop the previous writer ((ReplicaInPipeline)replicaInfo) .stopWriter(datanode.getDnConf().getXceiverStopTimeout()); invalidate(b.getBlockPoolId(), new Block[]{replicaInfo}); } else { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } } FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); // create a temporary file to hold block in the designated volume File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), 0); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; }
/** * Returns a clone of a replica stored in data-node memory. * Should be primarily used for testing. * @param blockId * @return */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if(r == null) return null; switch(r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica)r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten)r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery)r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline)r); } return null; }
/** * Returns handles to the block file and its metadata file */ @Override // FsDatasetSpi public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long ckoff) throws IOException { ReplicaInfo info = getReplicaInfo(b); File blockFile = info.getBlockFile(); RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); if (blkOffset > 0) { blockInFile.seek(blkOffset); } File metaFile = info.getMetaFile(); RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r"); if (ckoff > 0) { metaInFile.seek(ckoff); } return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD()); }
@Override // FsDatasetSpi public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); // create a temporary file to hold block in the designated volume File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; }
/** * Remove the temporary block file (if any) */ @Override // FsDatasetSpi public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { // remove from volumeMap volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); // delete the on-disk temp file if (delBlockFromDisk(replicaInfo.getBlockFile(), replicaInfo.getMetaFile(), b.getLocalBlock())) { LOG.warn("Block " + b + " unfinalized and removed. " ); } } }