/** * This should be primarily used for testing. * @return clone of replica store in datanode memory */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if(r == null) return null; switch(r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica)r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten)r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery)r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline)r); } return null; }
@Test public void testMix() { BlockListAsLongs blocks = checkReport( new FinalizedReplica(b1, null, null), new FinalizedReplica(b2, null, null), new ReplicaBeingWritten(b3, null, null, null), new ReplicaWaitingToBeRecovered(b4, null, null)); assertArrayEquals( new long[] { 2, 2, 1, 11, 111, 2, 22, 222, -1, -1, -1, 3, 33, 333, ReplicaState.RBW.getValue(), 4, 44, 444, ReplicaState.RWR.getValue() }, blocks.getBlockListAsLongs()); }
@Test public void testFuzz() throws InterruptedException { Replica[] replicas = new Replica[100000]; Random rand = new Random(0); for (int i=0; i<replicas.length; i++) { Block b = new Block(rand.nextLong(), i, i<<4); switch (rand.nextInt(2)) { case 0: replicas[i] = new FinalizedReplica(b, null, null); break; case 1: replicas[i] = new ReplicaBeingWritten(b, null, null, null); break; case 2: replicas[i] = new ReplicaWaitingToBeRecovered(b, null, null); break; } } checkReport(replicas); }
/** * Returns a clone of a replica stored in data-node memory. * Should be primarily used for testing. * @param blockId * @return */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if(r == null) return null; switch(r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica)r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten)r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery)r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline)r); } return null; }
@Override // FsDatasetSpi public synchronized ReplicaInPipeline recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed append to " + b); ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // change the replica's state/gs etc. if (replicaInfo.getState() == ReplicaState.FINALIZED ) { return append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, newGS, b.getNumBytes()); } else { //RBW bumpReplicaGS(replicaInfo, newGS); return (ReplicaBeingWritten)replicaInfo; } }
@Override // FsDatasetSpi public synchronized ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } // create a new block FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); // create a rbw file to hold block in the designated volume File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; }
/** * Returns a clone of a replica stored in data-node memory. * Should be primarily used for testing. * * @param blockId * @return */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if (r == null) { return null; } switch (r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica) r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten) r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered) r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery) r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline) r); } return null; }
@Override // FsDatasetSpi public synchronized ReplicaInPipeline recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed append to " + b); ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // change the replica's state/gs etc. if (replicaInfo.getState() == ReplicaState.FINALIZED) { return append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, newGS, b.getNumBytes()); } else { //RBW bumpReplicaGS(replicaInfo, newGS); return (ReplicaBeingWritten) replicaInfo; } }
@Override // FsDatasetSpi public synchronized ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client // re-opens the connection and retries sending those packets. // The other reason is that an "append" is occurring to this block. // check the validity of the parameter if (newGS < b.getGenerationStamp()) { throw new IOException("The new generation stamp " + newGS + " should be greater than the replica " + b + "'s generation stamp"); } ReplicaInfo replicaInfo = getReplicaInfo(b); LOG.info("Appending to " + replicaInfo); if (replicaInfo.getState() != ReplicaState.FINALIZED) { throw new ReplicaNotFoundException( ReplicaNotFoundException.UNFINALIZED_REPLICA + b); } if (replicaInfo.getNumBytes() != expectedBlockLen) { throw new IOException("Corrupted replica " + replicaInfo + " with a length of " + replicaInfo.getNumBytes() + " expected length is " + expectedBlockLen); } FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); ReplicaBeingWritten replica = null; try { replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS, b.getNumBytes()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } return new ReplicaHandler(replica, ref); }
@Test public void testUc() { BlockListAsLongs blocks = checkReport( new ReplicaBeingWritten(b1, null, null, null)); assertArrayEquals( new long[] { 0, 1, -1, -1, -1, 1, 11, 111, ReplicaState.RBW.getValue() }, blocks.getBlockListAsLongs()); }
@Override public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb) throws IOException { FsVolumeImpl vol = (FsVolumeImpl) volume; final String bpid = eb.getBlockPoolId(); final Block block = eb.getLocalBlock(); ReplicaBeingWritten rbw = new ReplicaBeingWritten( eb.getLocalBlock(), volume, vol.createRbwFile(bpid, block).getParentFile(), null); rbw.getBlockFile().createNewFile(); rbw.getMetaFile().createNewFile(); dataset.volumeMap.add(bpid, rbw); return rbw; }
@Override // FsDatasetSpi public synchronized ReplicaInPipeline createRbw(StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } // create a new block FsVolumeImpl v; while (true) { try { if (allowLazyPersist) { // First try to place the block on a transient volume. v = volumes.getNextTransientVolume(b.getNumBytes()); datanode.getMetrics().incrRamDiskBlocksWrite(); } else { v = volumes.getNextVolume(storageType, b.getNumBytes()); } } catch (DiskOutOfSpaceException de) { if (allowLazyPersist) { datanode.getMetrics().incrRamDiskBlocksWriteFallback(); allowLazyPersist = false; continue; } throw de; } break; } // create an rbw file to hold block in the designated volume File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; }
private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check state if (replicaInfo.getState() != ReplicaState.FINALIZED && replicaInfo.getState() != ReplicaState.RBW) { throw new ReplicaNotFoundException( ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo); } // check generation stamp long replicaGenerationStamp = replicaInfo.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || replicaGenerationStamp > newGS) { throw new ReplicaNotFoundException( ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp + ". Expected GS range is [" + b.getGenerationStamp() + ", " + newGS + "]."); } // stop the previous writer before check a replica's length long replicaLen = replicaInfo.getNumBytes(); if (replicaInfo.getState() == ReplicaState.RBW) { ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; // kill the previous writer rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); rbw.setWriter(Thread.currentThread()); // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same if (replicaLen != rbw.getBytesOnDisk() || replicaLen != rbw.getBytesAcked()) { throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo + "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" + rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() + ") are not the same."); } } // check block length if (replicaLen != expectedBlockLen) { throw new IOException("Corrupted replica " + replicaInfo + " with a length of " + replicaLen + " expected length is " + expectedBlockLen); } return replicaInfo; }
@Override // FsDatasetSpi public synchronized ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } // create a new block FsVolumeReference ref; while (true) { try { if (allowLazyPersist) { // First try to place the block on a transient volume. ref = volumes.getNextTransientVolume(b.getNumBytes()); datanode.getMetrics().incrRamDiskBlocksWrite(); } else { ref = volumes.getNextVolume(storageType, b.getNumBytes()); } } catch (DiskOutOfSpaceException de) { if (allowLazyPersist) { datanode.getMetrics().incrRamDiskBlocksWriteFallback(); allowLazyPersist = false; continue; } throw de; } break; } FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create an rbw file to hold block in the designated volume File f; try { f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); }
@Override // FsDatasetSpi public synchronized ReplicaHandler recoverRbw( ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { LOG.info("Recover RBW replica " + b); ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state if (replicaInfo.getState() != ReplicaState.RBW) { throw new ReplicaNotFoundException( ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo); } ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; LOG.info("Recovering " + rbw); // Stop the previous writer rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); rbw.setWriter(Thread.currentThread()); // check generation stamp long replicaGenerationStamp = rbw.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || replicaGenerationStamp > newGS) { throw new ReplicaNotFoundException( ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b + ". Expected GS range is [" + b.getGenerationStamp() + ", " + newGS + "]."); } // check replica length long bytesAcked = rbw.getBytesAcked(); long numBytes = rbw.getNumBytes(); if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){ throw new ReplicaNotFoundException("Unmatched length replica " + replicaInfo + ": BytesAcked = " + bytesAcked + " BytesRcvd = " + numBytes + " are not in the range of [" + minBytesRcvd + ", " + maxBytesRcvd + "]."); } FsVolumeReference ref = rbw.getVolume().obtainReference(); try { // Truncate the potentially corrupt portion. // If the source was client and the last node in the pipeline was lost, // any corrupt data written after the acked length can go unnoticed. if (numBytes > bytesAcked) { final File replicafile = rbw.getBlockFile(); truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); rbw.setNumBytes(bytesAcked); rbw.setLastChecksumAndDataLen(bytesAcked, null); } // bump the replica's generation stamp to newGS bumpReplicaGS(rbw, newGS); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } return new ReplicaHandler(rbw, ref); }
private FinalizedReplica updateReplicaUnderRecovery( String bpid, ReplicaUnderRecovery rur, long recoveryId, long newBlockId, long newlength) throws IOException { //check recovery id if (rur.getRecoveryID() != recoveryId) { throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId + ", rur=" + rur); } boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId; File blockFile; File metaFile; // bump rur's GS to be recovery id if(!copyOnTruncate) { bumpReplicaGS(rur, recoveryId); blockFile = rur.getBlockFile(); metaFile = rur.getMetaFile(); } else { File[] copiedReplicaFiles = copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId); blockFile = copiedReplicaFiles[1]; metaFile = copiedReplicaFiles[0]; } //update length if (rur.getNumBytes() < newlength) { throw new IOException("rur.getNumBytes() < newlength = " + newlength + ", rur=" + rur); } if (rur.getNumBytes() > newlength) { rur.unlinkBlock(1); truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength); if(!copyOnTruncate) { // update RUR with the new length rur.setNumBytes(newlength); } else { // Copying block to a new block with new blockId. // Not truncating original block. ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( newBlockId, recoveryId, rur.getVolume(), blockFile.getParentFile(), newlength); newReplicaInfo.setNumBytes(newlength); volumeMap.add(bpid, newReplicaInfo); finalizeReplica(bpid, newReplicaInfo); } } // finalize the block return finalizeReplica(bpid, rur); }
@Override // FsDatasetSpi public synchronized ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } // create a new block FsVolumeReference ref = null; // Use ramdisk only if block size is a multiple of OS page size. // This simplifies reservation for partially used replicas // significantly. if (allowLazyPersist && lazyWriter != null && b.getNumBytes() % cacheManager.getOsPageSize() == 0 && reserveLockedMemory(b.getNumBytes())) { try { // First try to place the block on a transient volume. ref = volumes.getNextTransientVolume(b.getNumBytes()); datanode.getMetrics().incrRamDiskBlocksWrite(); } catch(DiskOutOfSpaceException de) { // Ignore the exception since we just fall back to persistent storage. } finally { if (ref == null) { cacheManager.release(b.getNumBytes()); } } } if (ref == null) { ref = volumes.getNextVolume(storageType, b.getNumBytes()); } FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create an rbw file to hold block in the designated volume if (allowLazyPersist && !v.isTransientStorage()) { datanode.getMetrics().incrRamDiskBlocksWriteFallback(); } File f; try { f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); }