/** * This should be primarily used for testing. * @return clone of replica store in datanode memory */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if(r == null) return null; switch(r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica)r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten)r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery)r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline)r); } return null; }
/** * Returns a clone of a replica stored in data-node memory. * Should be primarily used for testing. * @param blockId * @return */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if(r == null) return null; switch(r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica)r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten)r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery)r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline)r); } return null; }
/** * Returns a clone of a replica stored in data-node memory. * Should be primarily used for testing. * * @param blockId * @return */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if (r == null) { return null; } switch (r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica) r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten) r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered) r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery) r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline) r); } return null; }
private File[] copyReplicaWithNewBlockIdAndGS( ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS) throws IOException { String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId; FsVolumeReference v = volumes.getNextVolume( replicaInfo.getVolume().getStorageType(), replicaInfo.getNumBytes()); final File tmpDir = ((FsVolumeImpl) v.getVolume()) .getBlockPoolSlice(bpid).getTmpDir(); final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId); final File dstBlockFile = new File(destDir, blockFileName); final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), dstMetaFile, dstBlockFile, true); }
private File[] copyReplicaWithNewBlockIdAndGS( ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS) throws IOException { String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId; FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume(); final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir(); final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId); final File dstBlockFile = new File(destDir, blockFileName); final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), dstMetaFile, dstBlockFile, true, smallBufferSize, conf); }
@Override public Replica createReplicaUnderRecovery( ExtendedBlock block, long recoveryId) throws IOException { try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0); ReplicaUnderRecovery rur = new ReplicaUnderRecovery(new FinalizedReplica( block.getLocalBlock(), volume, volume.getCurrentDir().getParentFile()), recoveryId ); dataset.volumeMap.add(block.getBlockPoolId(), rur); return rur; } }
private FinalizedReplica updateReplicaUnderRecovery( String bpid, ReplicaUnderRecovery rur, long recoveryId, long newlength) throws IOException { //check recovery id if (rur.getRecoveryID() != recoveryId) { throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId + ", rur=" + rur); } // bump rur's GS to be recovery id bumpReplicaGS(rur, recoveryId); //update length final File replicafile = rur.getBlockFile(); if (rur.getNumBytes() < newlength) { throw new IOException("rur.getNumBytes() < newlength = " + newlength + ", rur=" + rur); } if (rur.getNumBytes() > newlength) { rur.unlinkBlock(1); truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength); // update RUR with the new length rur.setNumBytes(newlength); } // finalize the block return finalizeReplica(bpid, rur); }
private FinalizedReplica updateReplicaUnderRecovery(String bpid, ReplicaUnderRecovery rur, long recoveryId, long newlength) throws IOException { //check recovery id if (rur.getRecoveryID() != recoveryId) { throw new IOException( "rur.getRecoveryID() != recoveryId = " + recoveryId + ", rur=" + rur); } // bump rur's GS to be recovery id bumpReplicaGS(rur, recoveryId); //update length final File replicafile = rur.getBlockFile(); if (rur.getNumBytes() < newlength) { throw new IOException( "rur.getNumBytes() < newlength = " + newlength + ", rur=" + rur); } if (rur.getNumBytes() > newlength) { rur.unlinkBlock(1); truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength); // update RUR with the new length rur.setNumBytesNoPersistance(newlength); } // finalize the block return finalizeReplica(bpid, rur); }
private FinalizedReplica updateReplicaUnderRecovery( String bpid, ReplicaUnderRecovery rur, long recoveryId, long newBlockId, long newlength) throws IOException { //check recovery id if (rur.getRecoveryID() != recoveryId) { throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId + ", rur=" + rur); } boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId; File blockFile; File metaFile; // bump rur's GS to be recovery id if(!copyOnTruncate) { bumpReplicaGS(rur, recoveryId); blockFile = rur.getBlockFile(); metaFile = rur.getMetaFile(); } else { File[] copiedReplicaFiles = copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId); blockFile = copiedReplicaFiles[1]; metaFile = copiedReplicaFiles[0]; } //update length if (rur.getNumBytes() < newlength) { throw new IOException("rur.getNumBytes() < newlength = " + newlength + ", rur=" + rur); } if (rur.getNumBytes() > newlength) { rur.unlinkBlock(1); truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength); if(!copyOnTruncate) { // update RUR with the new length rur.setNumBytes(newlength); } else { // Copying block to a new block with new blockId. // Not truncating original block. ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( newBlockId, recoveryId, rur.getVolume(), blockFile.getParentFile(), newlength); newReplicaInfo.setNumBytes(newlength); volumeMap.add(bpid, newReplicaInfo); finalizeReplica(bpid, newReplicaInfo); } } // finalize the block return finalizeReplica(bpid, rur); }
private FinalizedReplica updateReplicaUnderRecovery( String bpid, ReplicaUnderRecovery rur, long recoveryId, long newBlockId, long newlength) throws IOException { //check recovery id if (rur.getRecoveryID() != recoveryId) { throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId + ", rur=" + rur); } boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId; File blockFile; File metaFile; // bump rur's GS to be recovery id if(!copyOnTruncate) { bumpReplicaGS(rur, recoveryId); blockFile = rur.getBlockFile(); metaFile = rur.getMetaFile(); } else { File[] copiedReplicaFiles = copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId); blockFile = copiedReplicaFiles[1]; metaFile = copiedReplicaFiles[0]; } //update length if (rur.getNumBytes() < newlength) { throw new IOException("rur.getNumBytes() < newlength = " + newlength + ", rur=" + rur); } if (rur.getNumBytes() > newlength) { rur.breakHardLinksIfNeeded(); truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength); if(!copyOnTruncate) { // update RUR with the new length rur.setNumBytes(newlength); } else { // Copying block to a new block with new blockId. // Not truncating original block. FsVolumeSpi volume = rur.getVolume(); String blockPath = blockFile.getAbsolutePath(); String volumePath = volume.getBasePath(); assert blockPath.startsWith(volumePath) : "New block file: " + blockPath + " must be on " + "same volume as recovery replica: " + volumePath; ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( newBlockId, recoveryId, volume, blockFile.getParentFile(), newlength); newReplicaInfo.setNumBytes(newlength); volumeMap.add(bpid, newReplicaInfo); finalizeReplica(bpid, newReplicaInfo); } } // finalize the block return finalizeReplica(bpid, rur); }
/** * We're informed that a block is no longer valid. We * could lazily garbage-collect the block, but why bother? * just get rid of it. */ @Override // FsDatasetSpi public void invalidate(String bpid, Block invalidBlks[]) throws IOException { boolean error = false; for (int i = 0; i < invalidBlks.length; i++) { final File f; final FsVolumeImpl v; synchronized (this) { f = getFile(bpid, invalidBlks[i].getBlockId()); ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); if (info == null) { LOG.warn("Failed to delete replica " + invalidBlks[i] + ": ReplicaInfo not found."); error = true; continue; } if (info.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) { LOG.warn("Failed to delete replica " + invalidBlks[i] + ": GenerationStamp not matched, info=" + info); error = true; continue; } v = (FsVolumeImpl)info.getVolume(); if (f == null) { LOG.warn("Failed to delete replica " + invalidBlks[i] + ": File not found, volume=" + v); error = true; continue; } if (v == null) { LOG.warn("Failed to delete replica " + invalidBlks[i] + ". No volume for this replica, file=" + f + "."); error = true; continue; } File parent = f.getParentFile(); if (parent == null) { LOG.warn("Failed to delete replica " + invalidBlks[i] + ". Parent not found for file " + f + "."); error = true; continue; } ReplicaState replicaState = info.getState(); if (replicaState == ReplicaState.FINALIZED || (replicaState == ReplicaState.RUR && ((ReplicaUnderRecovery)info).getOriginalReplica().getState() == ReplicaState.FINALIZED)) { v.clearPath(bpid, parent); } volumeMap.remove(bpid, invalidBlks[i]); } // Delete the block asynchronously to make sure we can do it fast enough asyncDiskService.deleteAsync(v, f, FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), new ExtendedBlock(bpid, invalidBlks[i])); } if (error) { throw new IOException("Error in deleting blocks."); } }
/** * We're informed that a block is no longer valid. We * could lazily garbage-collect the block, but why bother? * just get rid of it. */ @Override // FsDatasetSpi public void invalidate(String bpid, Block invalidBlks[]) throws IOException { boolean error = false; for (Block invalidBlk : invalidBlks) { final File f; final FsVolumeImpl v; synchronized (this) { f = getFile(bpid, invalidBlk.getBlockId()); ReplicaInfo info = volumeMap.get(bpid, invalidBlk); if (info == null) { LOG.warn("Failed to delete replica " + invalidBlk + ": ReplicaInfo not found."); error = true; continue; } if (info.getGenerationStamp() != invalidBlk.getGenerationStamp()) { LOG.warn("Failed to delete replica " + invalidBlk + ": GenerationStamp not matched, info=" + info); error = true; continue; } v = (FsVolumeImpl) info.getVolume(); if (f == null) { LOG.warn("Failed to delete replica " + invalidBlk + ": File not found, volume=" + v); error = true; continue; } if (v == null) { LOG.warn("Failed to delete replica " + invalidBlk + ". No volume for this replica, file=" + f + "."); error = true; continue; } File parent = f.getParentFile(); if (parent == null) { LOG.warn("Failed to delete replica " + invalidBlk + ". Parent not found for file " + f + "."); error = true; continue; } ReplicaState replicaState = info.getState(); if (replicaState == ReplicaState.FINALIZED || (replicaState == ReplicaState.RUR && ((ReplicaUnderRecovery) info).getOriginalReplica().getState() == ReplicaState.FINALIZED)) { v.clearPath(bpid, parent); } volumeMap.remove(bpid, invalidBlk); } // Delete the block asynchronously to make sure we can do it fast enough asyncDiskService.deleteAsync(v, f, FsDatasetUtil.getMetaFile(f, invalidBlk.getGenerationStamp()), new ExtendedBlock(bpid, invalidBlk)); } if (error) { throw new IOException("Error in deleting blocks."); } }