private void checkReplicas(Map<Long,Replica> expectedReplicas, BlockListAsLongs decodedBlocks) { assertEquals(expectedReplicas.size(), decodedBlocks.getNumberOfBlocks()); Map<Long, Replica> reportReplicas = new HashMap<>(expectedReplicas); for (BlockReportReplica replica : decodedBlocks) { assertNotNull(replica); Replica expected = reportReplicas.remove(replica.getBlockId()); assertNotNull(expected); assertEquals("wrong bytes", expected.getNumBytes(), replica.getNumBytes()); assertEquals("wrong genstamp", expected.getGenerationStamp(), replica.getGenerationStamp()); assertEquals("wrong replica state", expected.getState(), replica.getState()); } assertTrue(reportReplicas.isEmpty()); }
@Override public Iterator<BlockReportReplica> iterator() { return new Iterator<BlockReportReplica>() { final BlockReportReplica block = new BlockReportReplica(); final CodedInputStream cis = buffer.newCodedInput(); private int currentBlockIndex = 0; @Override public boolean hasNext() { return currentBlockIndex < numBlocks; } @Override public BlockReportReplica next() { currentBlockIndex++; try { // zig-zag to reduce size of legacy blocks and mask off bits // we don't (yet) understand block.setBlockId(cis.readSInt64()); block.setNumBytes(cis.readRawVarint64() & NUM_BYTES_MASK); block.setGenerationStamp(cis.readRawVarint64()); long state = cis.readRawVarint64() & REPLICA_STATE_MASK; block.setState(ReplicaState.getState((int)state)); } catch (IOException e) { throw new IllegalStateException(e); } return block; } @Override public void remove() { throw new UnsupportedOperationException(); } }; }
public BlockReportReplica(Block block) { super(block); if (block instanceof BlockReportReplica) { this.state = ((BlockReportReplica)block).getState(); } else { this.state = ReplicaState.FINALIZED; } }
boolean addBlock(Block blk) { if(nrBlocks == blocks.size()) { if(LOG.isDebugEnabled()) { LOG.debug("Cannot add block: datanode capacity = " + blocks.size()); } return false; } blocks.set(nrBlocks, new BlockReportReplica(blk)); nrBlocks++; return true; }
void formBlockReport() { // fill remaining slots with blocks that do not exist for (int idx = blocks.size()-1; idx >= nrBlocks; idx--) { Block block = new Block(blocks.size() - idx, 0, 0); blocks.set(idx, new BlockReportReplica(block)); } blockReportList = BlockListAsLongs.EMPTY; }
/** check if DFS can handle corrupted blocks properly */ @Test public void testFileCorruption() throws Exception { MiniDFSCluster cluster = null; DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFileCorruption"). setNumFiles(20).build(); try { Configuration conf = new HdfsConfiguration(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); FileSystem fs = cluster.getFileSystem(); util.createFiles(fs, "/srcdat"); // Now deliberately remove the blocks String bpid = cluster.getNamesystem().getBlockPoolId(); DataNode dn = cluster.getDataNodes().get(2); Map<DatanodeStorage, BlockListAsLongs> blockReports = dn.getFSDataset().getBlockReports(bpid); assertTrue("Blocks do not exist on data-dir", !blockReports.isEmpty()); for (BlockListAsLongs report : blockReports.values()) { for (BlockReportReplica brr : report) { LOG.info("Deliberately removing block {}", brr.getBlockName()); cluster.getFsDatasetTestUtils(2).getMaterializedReplica( new ExtendedBlock(bpid, brr)).deleteData(); } } assertTrue("Corrupted replicas not handled properly.", util.checkFiles(fs, "/srcdat")); util.cleanup(fs, "/srcdat"); } finally { if (cluster != null) { cluster.shutdown(); } } }
private static ExtendedBlock getFirstBlock(DataNode dn, String bpid) { Map<DatanodeStorage, BlockListAsLongs> blockReports = dn.getFSDataset().getBlockReports(bpid); for (BlockListAsLongs blockLongs : blockReports.values()) { for (BlockReportReplica block : blockLongs) { return new ExtendedBlock(bpid, block); } } return null; }
void formBlockReport() { // fill remaining slots with blocks that do not exist for (int idx = blocks.size()-1; idx >= nrBlocks; idx--) { Block block = new Block(blocks.size() - idx, 0, 0); blocks.set(idx, new BlockReportReplica(block)); } blockReportList = BlockListAsLongs.encode(blocks); }
@Override public Iterator<BlockReportReplica> iterator() { return Collections.emptyIterator(); }
@Override public long[] getBlockListAsLongs() { // terribly inefficient but only occurs if server tries to transcode // an undecoded buffer into longs - ie. it will never happen but let's // handle it anyway if (numFinalized == -1) { int n = 0; for (Replica replica : this) { if (replica.getState() == ReplicaState.FINALIZED) { n++; } } numFinalized = n; } int numUc = numBlocks - numFinalized; int size = 2 + 3*(numFinalized+1) + 4*(numUc); long[] longs = new long[size]; longs[0] = numFinalized; longs[1] = numUc; int idx = 2; int ucIdx = idx + 3*numFinalized; // delimiter block longs[ucIdx++] = -1; longs[ucIdx++] = -1; longs[ucIdx++] = -1; for (BlockReportReplica block : this) { switch (block.getState()) { case FINALIZED: { longs[idx++] = block.getBlockId(); longs[idx++] = block.getNumBytes(); longs[idx++] = block.getGenerationStamp(); break; } default: { longs[ucIdx++] = block.getBlockId(); longs[ucIdx++] = block.getNumBytes(); longs[ucIdx++] = block.getGenerationStamp(); longs[ucIdx++] = block.getState().getValue(); break; } } } return longs; }
@Override public Iterator<BlockReportReplica> iterator() { return new Iterator<BlockReportReplica>() { private final BlockReportReplica block = new BlockReportReplica(); final Iterator<Long> iter = values.iterator(); private int currentBlockIndex = 0; @Override public boolean hasNext() { return currentBlockIndex < numBlocks; } @Override public BlockReportReplica next() { if (currentBlockIndex == finalizedBlocks) { // verify the presence of the delimiter block readBlock(); Preconditions.checkArgument(block.getBlockId() == -1 && block.getNumBytes() == -1 && block.getGenerationStamp() == -1, "Invalid delimiter block"); } readBlock(); if (currentBlockIndex++ < finalizedBlocks) { block.setState(ReplicaState.FINALIZED); } else { block.setState(ReplicaState.getState(iter.next().intValue())); } return block; } private void readBlock() { block.setBlockId(iter.next()); block.setNumBytes(iter.next()); block.setGenerationStamp(iter.next()); } @Override public void remove() { throw new UnsupportedOperationException(); } }; }
private BlockReportReplica() { }
/** * processFirstBlockReport is intended only for processing "initial" block * reports, the first block report received from a DN after it registers. * It just adds all the valid replicas to the datanode, without calculating * a toRemove list (since there won't be any). It also silently discards * any invalid blocks, thereby deferring their processing until * the next block report. * @param storageInfo - DatanodeStorageInfo that sent the report * @param report - the initial block report, to be processed * @throws IOException */ private void processFirstBlockReport( final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { if (report == null) return; assert (namesystem.hasWriteLock()); assert (storageInfo.getBlockReportCount() == 0); for (BlockReportReplica iblk : report) { ReplicaState reportedState = iblk.getState(); if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(iblk)) { queueReportedBlock(storageInfo, iblk, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); continue; } BlockInfoContiguous storedBlock = blocksMap.getStoredBlock(iblk); // If block does not belong to any file, we are done. if (storedBlock == null) continue; // If block is corrupt, mark it and continue to next block. BlockUCState ucState = storedBlock.getBlockUCState(); BlockToMarkCorrupt c = checkReplicaCorrupt( iblk, reportedState, storedBlock, ucState, storageInfo.getDatanodeDescriptor()); if (c != null) { if (shouldPostponeBlocksFromFuture) { // In the Standby, we may receive a block report for a file that we // just have an out-of-date gen-stamp or state for, for example. queueReportedBlock(storageInfo, iblk, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor()); } continue; } // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { ((BlockInfoContiguousUnderConstruction)storedBlock) .addReplicaIfNotPresent(storageInfo, iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 BlockInfoContiguousUnderConstruction blockUC = (BlockInfoContiguousUnderConstruction) storedBlock; if (namesystem.isInSnapshot(blockUC)) { int numOfReplicas = blockUC.getNumExpectedLocations(); namesystem.incrementSafeBlockCount(numOfReplicas); } //and fall through to next clause } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { addStoredBlockImmediate(storedBlock, storageInfo); } } }
TinyDatanode(int dnIdx, int blockCapacity) throws IOException { this.dnIdx = dnIdx; this.blocks = new ArrayList<BlockReportReplica>(blockCapacity); this.nrBlocks = 0; }
private static StorageBlockReport[] getBlockReports( DataNode dn, String bpid, boolean corruptOneBlockGs, boolean corruptOneBlockLen) { Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists = dn.getFSDataset().getBlockReports(bpid); // Send block report StorageBlockReport[] reports = new StorageBlockReport[perVolumeBlockLists.size()]; boolean corruptedGs = false; boolean corruptedLen = false; int reportIndex = 0; for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) { DatanodeStorage dnStorage = kvPair.getKey(); BlockListAsLongs blockList = kvPair.getValue(); // Walk the list of blocks until we find one each to corrupt the // generation stamp and length, if so requested. BlockListAsLongs.Builder builder = BlockListAsLongs.builder(); for (BlockReportReplica block : blockList) { if (corruptOneBlockGs && !corruptedGs) { long gsOld = block.getGenerationStamp(); long gsNew; do { gsNew = rand.nextInt(); } while (gsNew == gsOld); block.setGenerationStamp(gsNew); LOG.info("Corrupted the GS for block ID " + block); corruptedGs = true; } else if (corruptOneBlockLen && !corruptedLen) { long lenOld = block.getNumBytes(); long lenNew; do { lenNew = rand.nextInt((int)lenOld - 1); } while (lenNew == lenOld); block.setNumBytes(lenNew); LOG.info("Corrupted the length for block ID " + block); corruptedLen = true; } builder.add(new BlockReportReplica(block)); } reports[reportIndex++] = new StorageBlockReport(dnStorage, builder.build()); } return reports; }
TinyDatanode(int dnIdx, int blockCapacity) throws IOException { this.dnIdx = dnIdx; this.blocks = Arrays.asList(new BlockReportReplica[blockCapacity]); this.nrBlocks = 0; }
/** * Returns a singleton iterator over blocks in the block report. Do not * add the returned blocks to a collection. * @return Iterator */ abstract public Iterator<BlockReportReplica> iterator();