/** * This should be primarily used for testing. * @return clone of replica store in datanode memory */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if(r == null) return null; switch(r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica)r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten)r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery)r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline)r); } return null; }
@Test public void testMix() { BlockListAsLongs blocks = checkReport( new FinalizedReplica(b1, null, null), new FinalizedReplica(b2, null, null), new ReplicaBeingWritten(b3, null, null, null), new ReplicaWaitingToBeRecovered(b4, null, null)); assertArrayEquals( new long[] { 2, 2, 1, 11, 111, 2, 22, 222, -1, -1, -1, 3, 33, 333, ReplicaState.RBW.getValue(), 4, 44, 444, ReplicaState.RWR.getValue() }, blocks.getBlockListAsLongs()); }
@Test public void testFuzz() throws InterruptedException { Replica[] replicas = new Replica[100000]; Random rand = new Random(0); for (int i=0; i<replicas.length; i++) { Block b = new Block(rand.nextLong(), i, i<<4); switch (rand.nextInt(2)) { case 0: replicas[i] = new FinalizedReplica(b, null, null); break; case 1: replicas[i] = new ReplicaBeingWritten(b, null, null, null); break; case 2: replicas[i] = new ReplicaWaitingToBeRecovered(b, null, null); break; } } checkReport(replicas); }
@Override public void injectCorruptReplica(ExtendedBlock block) throws IOException { Preconditions.checkState(!dataset.contains(block), "Block " + block + " already exists on dataset."); try (FsVolumeReferences volRef = dataset.getFsVolumeReferences()) { FsVolumeImpl volume = (FsVolumeImpl) volRef.get(0); FinalizedReplica finalized = new FinalizedReplica( block.getLocalBlock(), volume, volume.getFinalizedDir(block.getBlockPoolId())); File blockFile = finalized.getBlockFile(); if (!blockFile.createNewFile()) { throw new FileExistsException( "Block file " + blockFile + " already exists."); } File metaFile = FsDatasetUtil.getMetaFile(blockFile, 1000); if (!metaFile.createNewFile()) { throw new FileExistsException( "Meta file " + metaFile + " already exists." ); } } }
/** * Returns a clone of a replica stored in data-node memory. * Should be primarily used for testing. * @param blockId * @return */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if(r == null) return null; switch(r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica)r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten)r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery)r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline)r); } return null; }
@Override // FsDatasetSpi public synchronized ReplicaInPipeline recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed append to " + b); ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // change the replica's state/gs etc. if (replicaInfo.getState() == ReplicaState.FINALIZED ) { return append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, newGS, b.getNumBytes()); } else { //RBW bumpReplicaGS(replicaInfo, newGS); return (ReplicaBeingWritten)replicaInfo; } }
/** * Returns a clone of a replica stored in data-node memory. * Should be primarily used for testing. * * @param blockId * @return */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); if (r == null) { return null; } switch (r.getState()) { case FINALIZED: return new FinalizedReplica((FinalizedReplica) r); case RBW: return new ReplicaBeingWritten((ReplicaBeingWritten) r); case RWR: return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered) r); case RUR: return new ReplicaUnderRecovery((ReplicaUnderRecovery) r); case TEMPORARY: return new ReplicaInPipeline((ReplicaInPipeline) r); } return null; }
@Override // FsDatasetSpi public synchronized ReplicaInPipeline recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed append to " + b); ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // change the replica's state/gs etc. if (replicaInfo.getState() == ReplicaState.FINALIZED) { return append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, newGS, b.getNumBytes()); } else { //RBW bumpReplicaGS(replicaInfo, newGS); return (ReplicaBeingWritten) replicaInfo; } }
@Override // FsDatasetSpi public synchronized ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client // re-opens the connection and retries sending those packets. // The other reason is that an "append" is occurring to this block. // check the validity of the parameter if (newGS < b.getGenerationStamp()) { throw new IOException("The new generation stamp " + newGS + " should be greater than the replica " + b + "'s generation stamp"); } ReplicaInfo replicaInfo = getReplicaInfo(b); LOG.info("Appending to " + replicaInfo); if (replicaInfo.getState() != ReplicaState.FINALIZED) { throw new ReplicaNotFoundException( ReplicaNotFoundException.UNFINALIZED_REPLICA + b); } if (replicaInfo.getNumBytes() != expectedBlockLen) { throw new IOException("Corrupted replica " + replicaInfo + " with a length of " + replicaInfo.getNumBytes() + " expected length is " + expectedBlockLen); } FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); ReplicaBeingWritten replica = null; try { replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS, b.getNumBytes()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } return new ReplicaHandler(replica, ref); }
/** * Get the list of finalized blocks from in-memory blockmap for a block pool. */ @Override public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) { ArrayList<FinalizedReplica> finalized = new ArrayList<FinalizedReplica>(volumeMap.size(bpid)); for (ReplicaInfo b : volumeMap.replicas(bpid)) { if(b.getState() == ReplicaState.FINALIZED) { finalized.add(new FinalizedReplica((FinalizedReplica)b)); } } return finalized; }
/** * Get the list of finalized blocks from in-memory blockmap for a block pool. */ @Override public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) { ArrayList<FinalizedReplica> finalized = new ArrayList<FinalizedReplica>(volumeMap.size(bpid)); for (ReplicaInfo b : volumeMap.replicas(bpid)) { if(!b.getVolume().isTransientStorage() && b.getState() == ReplicaState.FINALIZED) { finalized.add(new FinalizedReplica((FinalizedReplica)b)); } } return finalized; }
@Test public void testFinalized() { BlockListAsLongs blocks = checkReport( new FinalizedReplica(b1, null, null)); assertArrayEquals( new long[] { 1, 0, 1, 11, 111, -1, -1, -1 }, blocks.getBlockListAsLongs()); }
@Test public void testRemove() { // Test 1: null argument throws invalid argument exception try { map.remove(bpid, null); fail("Expected exception not thrown"); } catch (IllegalArgumentException expected) { } // Test 2: remove failure - generation stamp mismatch Block b = new Block(block); b.setGenerationStamp(0); assertNull(map.remove(bpid, b)); // Test 3: remove failure - blockID mismatch b.setGenerationStamp(block.getGenerationStamp()); b.setBlockId(0); assertNull(map.remove(bpid, b)); // Test 4: remove success assertNotNull(map.remove(bpid, block)); // Test 5: remove failure - invalid blockID assertNull(map.remove(bpid, 0)); // Test 6: remove success map.add(bpid, new FinalizedReplica(block, null, null)); assertNotNull(map.remove(bpid, block.getBlockId())); }
@Test public void testDuplicateReplicaResolution() throws IOException { FsVolumeImpl fsv1 = Mockito.mock(FsVolumeImpl.class); FsVolumeImpl fsv2 = Mockito.mock(FsVolumeImpl.class); File f1 = new File("d1/block"); File f2 = new File("d2/block"); ReplicaInfo replicaOlder = new FinalizedReplica(1,1,1,fsv1,f1); ReplicaInfo replica = new FinalizedReplica(1,2,2,fsv1,f1); ReplicaInfo replicaSame = new FinalizedReplica(1,2,2,fsv1,f1); ReplicaInfo replicaNewer = new FinalizedReplica(1,3,3,fsv1,f1); ReplicaInfo replicaOtherOlder = new FinalizedReplica(1,1,1,fsv2,f2); ReplicaInfo replicaOtherSame = new FinalizedReplica(1,2,2,fsv2,f2); ReplicaInfo replicaOtherNewer = new FinalizedReplica(1,3,3,fsv2,f2); // equivalent path so don't remove either assertNull(BlockPoolSlice.selectReplicaToDelete(replicaSame, replica)); assertNull(BlockPoolSlice.selectReplicaToDelete(replicaOlder, replica)); assertNull(BlockPoolSlice.selectReplicaToDelete(replicaNewer, replica)); // keep latest found replica assertSame(replica, BlockPoolSlice.selectReplicaToDelete(replicaOtherSame, replica)); assertSame(replicaOtherOlder, BlockPoolSlice.selectReplicaToDelete(replicaOtherOlder, replica)); assertSame(replica, BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica)); }
@Override public Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block) throws IOException { FsVolumeImpl vol = (FsVolumeImpl) volume; ReplicaInfo info = new FinalizedReplica(block.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); dataset.volumeMap.add(block.getBlockPoolId(), info); info.getBlockFile().createNewFile(); info.getMetaFile().createNewFile(); return info; }
@Override public Replica createReplicaUnderRecovery( ExtendedBlock block, long recoveryId) throws IOException { try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0); ReplicaUnderRecovery rur = new ReplicaUnderRecovery(new FinalizedReplica( block.getLocalBlock(), volume, volume.getCurrentDir().getParentFile()), recoveryId ); dataset.volumeMap.add(block.getBlockPoolId(), rur); return rur; } }