public DatanodeCommand blockReportNew(DatanodeRegistration nodeReg, BlockReport rep) throws IOException { if (runInfo.shutdown || !runInfo.isRunning) { return null; } if (ignoreDatanodes()) { LOG.info("Standby fell behind. Telling " + nodeReg.toString() + " to back off"); // Do not process block reports yet as the ingest thread is catching up return AvatarDatanodeCommand.BACKOFF; } if (currentAvatar == Avatar.STANDBY) { Collection<Block> failed = super.blockReportWithRetries(nodeReg, rep); // standby should send only DNA_RETRY BlockCommand bCmd = new BlockCommand(DatanodeProtocols.DNA_RETRY, failed.toArray(new Block[failed.size()])); return bCmd; } else { // only the primary can send DNA_FINALIZE return super.blockReport(nodeReg, rep); } }
private HashMatchingResult calculateMismatchedHashes(DatanodeDescriptor dn, BlockReport report) throws IOException { List<HashBucket> allMachineHashes = HashBuckets.getInstance() .getBucketsForDatanode(dn); List<Integer> matchedBuckets = new ArrayList<>(); List<Integer> mismatchedBuckets = new ArrayList<>(); for (int i = 0; i < report.getBuckets().length; i++){ boolean matched = false; for (HashBucket bucket : allMachineHashes){ if (bucket.getBucketId() == i && bucket.getHash() == report .getHashes()[i]){ matched = true; break; } } if (matched){ matchedBuckets.add(i); } else { mismatchedBuckets.add(i); } } return new HashMatchingResult(matchedBuckets, mismatchedBuckets); }
void register() throws IOException { // get versions from the namenode nsInfo = nameNodeProto.versionRequest(); dnRegistration = new DatanodeRegistration( new DatanodeID(DNS.getDefaultIP("default"), DNS.getDefaultHost("default", "default"), "", getNodePort(dnIdx), DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT), new DataStorage(nsInfo, ""), new ExportedBlockKeys(), VersionInfo.getVersion()); DataNode.setNewStorageID(dnRegistration); // register datanode dnRegistration = nameNodeProto.registerDatanode(dnRegistration); //first block reports storage = new DatanodeStorage(dnRegistration.getStorageID()); final StorageBlockReport[] reports = {new StorageBlockReport(storage, BlockReport.builder(NUM_BUCKETS).build())}; nameNodeProto.blockReport(dnRegistration, nameNode.getNamesystem().getBlockPoolId(), reports); }
@Test public void testSafeModeIBRAfterIncremental() throws Exception { DatanodeDescriptor node = spy(nodes.get(0)); node.setStorageID("dummy-storage"); node.isAlive = true; DatanodeRegistration nodeReg = new DatanodeRegistration(node, null, null, ""); // pretend to be in safemode doReturn(true).when(fsn).isInStartupSafeMode(); // register new node bm.getDatanodeManager().registerDatanode(nodeReg); bm.getDatanodeManager().addDatanode(node); // swap in spy assertEquals(node, bm.getDatanodeManager().getDatanode(node)); assertTrue(node.isFirstBlockReport()); // send block report while pretending to already have blocks reset(node); doReturn(1).when(node).numBlocks(); bm.processReport(node, "pool", BlockReport.builder(numBuckets).build()); verify(node).receivedBlockReport(); assertFalse(node.isFirstBlockReport()); }
/** * Test creates a file and closes it. * The second datanode is started in the cluster. * As soon as the replication process is completed test runs * Block report and checks that no underreplicated blocks are left * * @throws IOException * in case of an error */ @Test public void blockReport_06() throws Exception { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); startDNandWait(filePath, true); // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] report = {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()), BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build())}; cluster.getNameNodeRpc().blockReport(dnR, poolId, report); printStats(); Thread.sleep(10000); //HOP: wait for the replication monitor to catch up assertEquals("Wrong number of PendingReplication Blocks", 0, cluster.getNamesystem().getUnderReplicatedBlocks()); }
public DatanodeCommand blockReportNew(DatanodeRegistration nodeReg, BlockReport rep) throws IOException { if (runInfo.shutdown || !runInfo.isRunning) { return null; } if (ignoreDatanodes()) { LOG.info("Standby fell behind. Telling " + nodeReg.toString() + " to back off"); // Do not process block reports yet as the ingest thread is catching up return AvatarDatanodeCommand.BACKOFF; } if (currentAvatar == Avatar.STANDBY) { Collection<Block> failed = super.blockReportWithRetries(nodeReg, rep); BlockCommand bCmd = new BlockCommand(DatanodeProtocols.DNA_RETRY, failed.toArray(new Block[failed.size()])); return bCmd; } else { return super.blockReport(nodeReg, rep); } }
/** * add new replica blocks to the Inode to target mapping * also add the Inode file to DataNodeDesc */ public void blocksBeingWrittenReport(DatanodeRegistration nodeReg, BlockReport blocks) throws IOException { verifyRequest(nodeReg); long[] blocksAsLong = blocks.getBlockReportInLongs(); BlockListAsLongs blist = new BlockListAsLongs(blocksAsLong); boolean processed = namesystem.processBlocksBeingWrittenReport(nodeReg, blist); String message = "*BLOCK* NameNode.blocksBeingWrittenReport: " +"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +" blocks"; if (!processed) { message += " was discarded."; } stateChangeLog.info(message); }
protected Collection<Block> blockReportWithRetries( DatanodeRegistration nodeReg, BlockReport blocks) throws IOException { verifyRequest(nodeReg); myMetrics.numBlockReport.inc(); BlockListAsLongs blist = new BlockListAsLongs(blocks.getBlockReportInLongs()); stateChangeLog.debug("*BLOCK* NameNode.blockReport: " + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks() + " blocks"); return namesystem.processReport(nodeReg, blist); }
/** * Sends a 'Blocks Being Written' report to the given node. * * @param node the node to send the report to * @throws IOException */ public void sendBlocksBeingWrittenReport(DatanodeProtocol node, int namespaceId, DatanodeRegistration nsRegistration) throws IOException { Block[] blocks = data.getBlocksBeingWrittenReport(namespaceId); if (blocks != null && blocks.length != 0) { long[] blocksAsLong = BlockListAsLongs.convertToArrayLongs(blocks); BlockReport bbwReport = new BlockReport(blocksAsLong); node.blocksBeingWrittenReport(nsRegistration, bbwReport); } }
@Override // DatanodeProtocol public DatanodeCommand blockReport(DatanodeRegistration nodeReg, String poolId, StorageBlockReport[] reports) throws IOException { verifyRequest(nodeReg); BlockReport blist = reports[0].getReport(); // Assume no federation '0' if (blockStateChangeLog.isDebugEnabled()) { blockStateChangeLog.debug( "*BLOCK* NameNode.blockReport: " + "from " + nodeReg + " " + blist.getNumBlocks() + " blocks"); } namesystem.getBlockManager().processReport(nodeReg, poolId, blist); return new FinalizeCommand(poolId); }
public void applyHash(int storageId, HdfsServerConstants.ReplicaState state, Block block ) throws TransactionContextException, StorageException { int bucketId = getBucketForBlock(block); HashBucket bucket = getBucket(storageId, bucketId); long newHash = bucket.getHash() + BlockReport.hash(block, state); LOG.debug("Applying block:" + blockToString (block) + "sid: " + storageId + "state: " + state.name() + ", hash: " + BlockReport.hash(block, state)); bucket.setHash(newHash); }
public void undoHash(int storageId, HdfsServerConstants.ReplicaState state, Block block) throws TransactionContextException, StorageException { int bucketId = getBucketForBlock(block); HashBucket bucket = getBucket(storageId, bucketId); long newHash = bucket.getHash() - BlockReport.hash(block, state); LOG.debug("Undo block:" + blockToString (block) + "sid: " + storageId + "state: " + state.name() + ", hash: " + BlockReport.hash(block,state)); bucket.setHash(newHash); }
/** * Generates a block report from the in-memory block map. */ @Override // FsDatasetSpi public BlockReport getBlockReport(String bpid) { int size = volumeMap.size(bpid); BlockReport.Builder builder = BlockReport.builder(NUM_BUCKETS); if (size == 0) { return builder.build(); } synchronized (this) { for (ReplicaInfo b : volumeMap.replicas(bpid)) { switch (b.getState()) { case FINALIZED: case RBW: case RWR: builder.add(b); break; case RUR: ReplicaUnderRecovery rur = (ReplicaUnderRecovery) b; builder.add(rur.getOriginalReplica()); break; case TEMPORARY: break; default: assert false : "Illegal ReplicaInfo state."; } } return builder.build(); } }
public static DatanodeProtocolProtos.BlockReportProto convert(BlockReport report) { List<DatanodeProtocolProtos.BlockReportBucketProto> bucketProtos = new ArrayList<>(); for (BlockReportBucket bucket : report.getBuckets()){ DatanodeProtocolProtos.BlockReportBucketProto.Builder bucketBuilder = DatanodeProtocolProtos.BlockReportBucketProto.newBuilder(); for (BlockReportBlock block : bucket.getBlocks()){ bucketBuilder.addBlocks( DatanodeProtocolProtos.BlockReportBlockProto.newBuilder() .setBlockId(block.getBlockId()) .setGenerationStamp(block.getGenerationStamp()) .setLength(block.getLength()) .setState(convert(block.getState()))); } bucketProtos.add(bucketBuilder.build()); } List<Long> hashes = new ArrayList<>(); for (long hash : report.getHashes()){ hashes.add(hash); } return DatanodeProtocolProtos.BlockReportProto.newBuilder() .addAllBuckets(bucketProtos) .addAllHashes(hashes) .build(); }
public static BlockReport convert( DatanodeProtocolProtos.BlockReportProto blockReportProto) { int numBuckets = blockReportProto.getBucketsCount(); BlockReportBucket[] buckets = new BlockReportBucket[numBuckets]; long[] hashes = new long[numBuckets]; int numBlocks = 0; for(int i = 0; i < numBuckets ; i ++){ DatanodeProtocolProtos.BlockReportBucketProto bucketProto = blockReportProto.getBuckets(i); int numBlocksInBucket = bucketProto.getBlocksCount(); numBlocks += numBlocksInBucket; BlockReportBlock[] blocks = new BlockReportBlock[numBlocksInBucket]; for (int j = 0; j < numBlocksInBucket; j++){ DatanodeProtocolProtos.BlockReportBlockProto blockProto = bucketProto.getBlocks(j); blocks[j] = new BlockReportBlock(blockProto.getBlockId(), blockProto .getGenerationStamp(), blockProto.getLength(), convert(blockProto .getState())); } BlockReportBucket bucket = new BlockReportBucket(); bucket.setBlocks(blocks); buckets[i] = bucket; hashes[i] = blockReportProto.getHashes(i); } return new BlockReport(buckets, hashes, numBlocks); }
/** * @return block reports from all data nodes * BlockListAsLongs is indexed in the same order as the list of datanodes * returned by getDataNodes() */ public Iterable<BlockReportBlock>[] getAllBlockReports(String bpid) { int numDataNodes = dataNodes.size(); Iterable<BlockReportBlock>[] result = new BlockReport[numDataNodes]; for (int i = 0; i < numDataNodes; ++i) { result[i] = getBlockReport(bpid, i); } return result; }
void formBlockReport() { // fill remaining slots with blocks that do not exist for (int idx = blocks.size() - 1; idx >= nrBlocks; idx--) { blocks.set(idx, new Block(blocks.size() - idx, 0, 0)); } blockReportList = BlockReport.builder(NUM_BUCKETS).addAllAsFinalized (blocks).build(); }
@Test public void testSafeModeIBR() throws Exception { DatanodeDescriptor node = spy(nodes.get(0)); node.setStorageID("dummy-storage"); node.isAlive = true; DatanodeRegistration nodeReg = new DatanodeRegistration(node, null, null, ""); // pretend to be in safemode doReturn(true).when(fsn).isInStartupSafeMode(); // register new node bm.getDatanodeManager().registerDatanode(nodeReg); bm.getDatanodeManager().addDatanode(node); // swap in spy assertEquals(node, bm.getDatanodeManager().getDatanode(node)); assertTrue(node.isFirstBlockReport()); // send block report, should be processed reset(node); bm.processReport(node, "pool", BlockReport.builder(numBuckets).build()); verify(node).receivedBlockReport(); assertFalse(node.isFirstBlockReport()); // send block report again, should NOT be processed reset(node); bm.processReport(node, "pool", BlockReport.builder(numBuckets).build()); verify(node, never()).receivedBlockReport(); assertFalse(node.isFirstBlockReport()); // re-register as if node restarted, should update existing node bm.getDatanodeManager().removeDatanode(node); reset(node); bm.getDatanodeManager().registerDatanode(nodeReg); verify(node).updateRegInfo(nodeReg); assertTrue(node.isFirstBlockReport()); // ready for report again // send block report, should be processed after restart reset(node); bm.processReport(node, "pool", BlockReport.builder(numBuckets).build()); verify(node).receivedBlockReport(); assertFalse(node.isFirstBlockReport()); }
@Override public synchronized BlockReport getBlockReport(String bpid) { final List<Block> blocks = new ArrayList<>(); final Map<Block, BInfo> map = blockMap.get(bpid); BlockReport.Builder builder = BlockReport.builder(NUM_BUCKETS); if (map != null) { for (BInfo b : map.values()) { if (b.isFinalized()) { builder.addAsFinalized(b.theBlock); } } } return builder.build(); }
@Test public void testGetBlockReport() throws IOException { SimulatedFSDataset fsdataset = getSimulatedFSDataset(); BlockReport blockReport = fsdataset.getBlockReport(bpid); assertEquals(0, blockReport.getNumBlocks()); addSomeBlocks(fsdataset); blockReport = fsdataset.getBlockReport(bpid); assertEquals(NUMBLOCKS, blockReport.getNumBlocks()); for (BlockReportBlock b : blockReport) { assertNotNull(b); assertEquals(blockIdToLen(b.getBlockId()), b.getLength()); } }
/** * Test writes a file and closes it. Then test finds a block * and changes its GS to be < of original one. * New empty block is added to the list of blocks. * Block report is forced and the check for # of corrupted blocks is * performed. * * @throws IOException * in case of an error */ @Test public void blockReport_03() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); ArrayList<Block> blocks = prepareForRide(filePath, METHOD_NAME, FILE_SIZE); // The block with modified GS won't be found. Has to be deleted blocks.get(0).setGenerationStampNoPersistance(rand.nextLong()); // This new block is unknown to NN and will be mark for deletion. blocks.add(new Block()); // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N0); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] report = {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()), BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build())}; DatanodeCommand dnCmd = cluster.getNameNodeRpc().blockReport(dnR, poolId, report); if (LOG.isDebugEnabled()) { LOG.debug("Got the command: " + dnCmd); } printStats(); assertEquals( "Wrong number of CorruptedReplica+PendingDeletion " + "blocks is found", 2, cluster.getNamesystem().getCorruptReplicaBlocks() + cluster.getNamesystem().getPendingDeletionBlocks()); }
@Test public void blockReportRegrssion() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); ArrayList<Block> blocks = new ArrayList<>(); for(int i = 0 ; i < 3; i++){ Path filePath = new Path("/" + METHOD_NAME +i+ ".dat"); DFSTestUtil .createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong()); blocks.addAll(locatedToBlocks(cluster.getNameNodeRpc() .getBlockLocations(filePath.toString(), FILE_START, FILE_SIZE) .getLocatedBlocks(), null)); } if (LOG.isDebugEnabled()) { LOG.debug("Number of blocks allocated " + blocks.size()); } // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N0); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] report = {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()), BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build())}; try{ cluster.getNameNodeRpc().blockReport(dnR, poolId, report); }catch(Exception e){ fail("No exception was expected. Get "+e); } }
/** * add new replica blocks to the Inode to target mapping * also add the Inode file to DataNodeDesc */ public void blocksBeingWrittenReport(DatanodeRegistration nodeReg, BlockReport blocks) throws IOException { verifyRequest(nodeReg); long[] blocksAsLong = blocks.getBlockReportInLongs(); BlockListAsLongs blist = new BlockListAsLongs(blocksAsLong); namesystem.processBlocksBeingWrittenReport(nodeReg, blist); stateChangeLog.info("*BLOCK* NameNode.blocksBeingWrittenReport: " +"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +" blocks"); }
@Override public DatanodeCommand blockReport(DatanodeRegistration nodeReg, BlockReport blocks) throws IOException { return blockReport(nodeReg, blocks.getBlockReportInLongs()); }
/** * The given datanode is reporting all its blocks. * Update the (machine-->blocklist) and (block-->machinelist) maps. */ public void processReport(final DatanodeID nodeID, final String poolId, final BlockReport newReport) throws IOException { final long startTime = Time.now(); //after acquiring write lock final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); if (node == null || !node.isAlive) { throw new IOException( "ProcessReport from dead or unregistered node: " + nodeID); } // To minimize startup time, we discard any second (or later) block reports // that we receive while still in startup phase. if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) { blockLog.info("BLOCK* processReport: " + "discarded non-initial block report from " + nodeID + " because namenode still in startup phase"); return; } ReportStatistics reportStatistics = processReport(node, newReport); // Now that we have an up-to-date block report, we know that any // deletions from a previous NN iteration have been accounted for. boolean staleBefore = node.areBlockContentsStale(); node.receivedBlockReport(); if (staleBefore && !node.areBlockContentsStale()) { LOG.info( "BLOCK* processReport: Received first block report from " + node + " after becoming active. Its block contents are no longer" + " considered stale"); rescanPostponedMisreplicatedBlocks(); } final long endTime = Time.now(); // Log the block report processing stats from Namenode perspective final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); if (metrics != null) { metrics.addBlockReport((int) (endTime - startTime)); } blockLog.info("BLOCK* processReport: from " + nodeID + ", blocks: " + newReport.getNumBlocks() + ", processing time: " + (endTime - startTime) + " ms. " + reportStatistics); }
/** * Report the list blocks to the Namenode * * @throws IOException */ DatanodeCommand blockReport() throws IOException { // send block report if timer has expired. DatanodeCommand cmd = null; long startTime = now(); if (startTime - lastBlockReport > dnConf.blockReportInterval) { // Flush any block information that precedes the block report. Otherwise // we have a chance that we will miss the delHint information // or we will report an RBW replica after the BlockReport already reports // a FINALIZED one. reportReceivedDeletedBlocks(); // Create block report long brCreateStartTime = now(); BlockReport bReport = dn.getFSDataset().getBlockReport(getBlockPoolId()); // Send block report long brSendStartTime = now(); StorageBlockReport[] report = {new StorageBlockReport( new DatanodeStorage(bpRegistration.getStorageID()), bReport)}; ActiveNode an = nextNNForBlkReport(bReport.getNumBlocks()); if (an != null) { blkReportHander = getAnActor(an.getRpcServerAddressForDatanodes()); if (blkReportHander == null || !blkReportHander.isInitialized()) { return null; //no one is ready to handle the request, return now without changing the values of lastBlockReport. it will be retried in next cycle } } else { LOG.warn("Unable to send block report. Current namenodes are: "+ Arrays.toString(nnList.toArray())); return null; } cmd = blkReportHander.blockReport(bpRegistration, getBlockPoolId(), report); // Log the block report processing stats from Datanode perspective long brSendCost = now() - brSendStartTime; long brCreateCost = brSendStartTime - brCreateStartTime; dn.getMetrics().addBlockReport(brSendCost); LOG.info( "BlockReport of " + bReport.getNumBlocks() + " blocks took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing"); // If we have sent the first block report, then wait a random // time before we start the periodic block reports. if (resetBlockReportTime) { lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int) (dnConf.blockReportInterval)); resetBlockReportTime = false; } else { /* say the last block report was at 8:20:14. The current report * should have started around 9:20:14 (default 1 hour interval). * If current time is : * 1) normal like 9:20:18, next report should be at 10:20:14 * 2) unexpected like 11:35:43, next report should be at 12:20:14 */ lastBlockReport += (now() - lastBlockReport) / dnConf.blockReportInterval * dnConf.blockReportInterval; } LOG.info("sent block report, processed command:" + cmd); } return cmd; }
BlockReport getBlockReportList() { return blockReportList; }
/** * Test write a file, verifies and closes it. Then a couple of random blocks * is removed and BlockReport is forced; the FSNamesystem is pushed to * recalculate required DN's activities such as replications and so on. * The number of missing and under-replicated blocks should be the same in * case of a single-DN cluster. * * @throws IOException * in case of errors */ @Test public void blockReport_02() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); LOG.info("Running test " + METHOD_NAME); Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil .createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong()); // mock around with newly created blocks and delete some File dataDir = new File(cluster.getDataDirectory()); assertTrue(dataDir.isDirectory()); List<ExtendedBlock> blocks2Remove = new ArrayList<>(); List<Integer> removedIndex = new ArrayList<>(); List<LocatedBlock> lBlocks = cluster.getNameNodeRpc() .getBlockLocations(filePath.toString(), FILE_START, FILE_SIZE) .getLocatedBlocks(); while (removedIndex.size() != 2) { int newRemoveIndex = rand.nextInt(lBlocks.size()); if (!removedIndex.contains(newRemoveIndex)) { removedIndex.add(newRemoveIndex); } } for (Integer aRemovedIndex : removedIndex) { blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock()); } ArrayList<Block> blocks = locatedToBlocks(lBlocks, removedIndex); if (LOG.isDebugEnabled()) { LOG.debug("Number of blocks allocated " + lBlocks.size()); } final DataNode dn0 = cluster.getDataNodes().get(DN_N0); for (ExtendedBlock b : blocks2Remove) { if (LOG.isDebugEnabled()) { LOG.debug("Removing the block " + b.getBlockName()); } for (File f : findAllFiles(dataDir, new MyFileFilter(b.getBlockName(), true))) { DataNodeTestUtils.getFSDataset(dn0).unfinalizeBlock(b); if (!f.delete()) { LOG.warn("Couldn't delete " + b.getBlockName()); } } } waitTil(DN_RESCAN_EXTRA_WAIT); // all blocks belong to the same file, hence same BP String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId); StorageBlockReport[] report = {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()), BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build ())}; cluster.getNameNodeRpc().blockReport(dnR, poolId, report); BlockManagerTestUtil .getComputedDatanodeWork(cluster.getNamesystem().getBlockManager()); printStats(); assertEquals("Wrong number of MissingBlocks is found", blocks2Remove.size(), cluster.getNamesystem().getMissingBlocksCount()); assertEquals("Wrong number of UnderReplicatedBlocks is found", blocks2Remove.size(), cluster.getNamesystem().getUnderReplicatedBlocks()); }
/** * The test set the configuration parameters for a large block size and * restarts initiated single-node cluster. * Then it writes a file > block_size and closes it. * The second datanode is started in the cluster. * As soon as the replication process is started and at least one TEMPORARY * replica is found test forces BlockReport process and checks * if the TEMPORARY replica isn't reported on it. * Eventually, the configuration is being restored into the original state. * * @throws IOException * in case of an error */ @Test public void blockReport_08() throws IOException, InterruptedException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; final int bytesChkSum = 1024 * 1000; conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum); shutDownCluster(); startUpCluster(); try { ArrayList<Block> blocks = writeFile(METHOD_NAME, 12 * bytesChkSum, filePath); Block bl = findBlock(filePath, 12 * bytesChkSum); BlockChecker bc = new BlockChecker(filePath); bc.start(); waitForTempReplica(bl, DN_N1); // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] report = {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()), BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build())}; cluster.getNameNodeRpc().blockReport(dnR, poolId, report); assertEquals("Wrong number of PendingReplication blocks", blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks()); printStats(); try { bc.join(); } catch (InterruptedException e) { } } finally { resetConfiguration(); // return the initial state of the configuration } }
@Test public void blockReport_09() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; final int bytesChkSum = 1024 * 1000; conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum); shutDownCluster(); startUpCluster(); // write file and start second node to be "older" than the original try { ArrayList<Block> blocks = writeFile(METHOD_NAME, 12 * bytesChkSum, filePath); Block bl = findBlock(filePath, 12 * bytesChkSum); BlockChecker bc = new BlockChecker(filePath); bc.start(); corruptBlockGS(bl); corruptBlockLen(bl); waitForTempReplica(bl, DN_N1); // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] report = {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()), BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build())}; cluster.getNameNodeRpc().blockReport(dnR, poolId, report); assertEquals("Wrong number of PendingReplication blocks", 2, cluster.getNamesystem().getPendingReplicationBlocks()); printStats(); try { bc.join(); } catch (InterruptedException e) { } } finally { resetConfiguration(); // return the initial state of the configuration } }
/** * This method should not be invoked on the composite * DatanodeProtocols object. You can call these on the individual * DatanodeProcol objects. */ public void blocksBeingWrittenReport(DatanodeRegistration registration, BlockReport blocks) throws IOException { throw new IOException("blockReport" + errMessage); }
/** * This method should not be invoked on the composite * DatanodeProtocols object. You can call these on the individual * DatanodeProcol objects. */ public DatanodeCommand blockReport(DatanodeRegistration registration, BlockReport blocks) throws IOException { throw new IOException("blockReport" + errMessage); }
/** * Returns the block report - the full list of blocks stored under a * block pool * * @param bpid * Block Pool Id * @return - the block report - the full list of blocks stored */ public BlockReport getBlockReport(String bpid);
public DatanodeCommand blockReportNew(DatanodeRegistration reg, BlockReport rep) throws IOException;