/** * Add a thread which periodically triggers deletion reports, * heartbeats, and NN-side block work. * @param interval millisecond period on which to run */ public void addReplicationTriggerThread(final int interval) { testCtx.addThread(new RepeatingTestThread(testCtx) { @Override public void doAnAction() throws Exception { for (DataNode dn : cluster.getDataNodes()) { DataNodeTestUtils.triggerDeletionReport(dn); DataNodeTestUtils.triggerHeartbeat(dn); } for (int i = 0; i < 2; i++) { NameNode nn = cluster.getNameNode(i); BlockManagerTestUtil.computeAllPendingWork( nn.getNamesystem().getBlockManager()); } Thread.sleep(interval); } }); }
/** * Test for the following case proposed by ATM: * 1. Both NNs are up, one is active. There are 100 blocks. Both are * out of safemode. * 2. 10 block deletions get processed by NN1. NN2 enqueues these DN messages * until it next reads from a checkpointed edits file. * 3. NN2 gets restarted. Its queues are lost. * 4. NN2 comes up, reads from all the finalized edits files. Concludes there * should still be 100 blocks. * 5. NN2 receives a block report from all the DNs, which only accounts for * 90 blocks. It doesn't leave safemode. * 6. NN1 dies or is transitioned to standby. * 7. NN2 is transitioned to active. It reads all the edits from NN1. It now * knows there should only be 90 blocks, but it's still in safemode. * 8. NN2 doesn't ever recheck whether it should leave safemode. * * This is essentially the inverse of {@link #testBlocksAddedBeforeStandbyRestart()} */ @Test public void testBlocksRemovedBeforeStandbyRestart() throws Exception { banner("Starting with NN0 active and NN1 standby, creating some blocks"); DFSTestUtil.createFile(fs, new Path("/test"), 5*BLOCK_SIZE, (short) 3, 1L); // Roll edit log so that, when the SBN restarts, it will load // the namespace during startup. nn0.getRpcServer().rollEditLog(); // Delete those blocks again, so they won't get reported to the SBN // once it starts up banner("Removing the blocks without rolling the edit log"); fs.delete(new Path("/test"), true); BlockManagerTestUtil.computeAllPendingWork( nn0.getNamesystem().getBlockManager()); cluster.triggerHeartbeats(); banner("Restarting standby"); restartStandby(); assertSafeMode(nn1, 0, 5, 3, 0); banner("Waiting for standby to catch up to active namespace"); HATestUtil.waitForStandbyToCatchUp(nn0, nn1); assertSafeMode(nn1, 0, 0, 3, 0); }
@Override void generateInputs(int[] ignore) throws IOException { final FSNamesystem namesystem = nameNode.getNamesystem(); // start data-nodes; create a bunch of files; generate block reports. blockReportObject.generateInputs(ignore); // stop replication monitor BlockManagerTestUtil.stopReplicationThread(namesystem.getBlockManager()); // report blocks once int nrDatanodes = blockReportObject.getNumDatanodes(); for(int idx=0; idx < nrDatanodes; idx++) { blockReportObject.executeOp(idx, 0, null); } // decommission data-nodes decommissionNodes(); // set node replication limit BlockManagerTestUtil.setNodeReplicationLimit(namesystem.getBlockManager(), nodeReplicationLimit); }
/** * Verify the following scenario. * 1. NN restarts. * 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister. * 3. After reregistration completes, DN will send Heartbeat, followed by * Blockreport. * 4. NN will mark DatanodeStorageInfo#blockContentsStale to false. * @throws Exception */ @Test(timeout = 60000) public void testStorageBlockContentsStaleAfterNNRestart() throws Exception { MiniDFSCluster dfsCluster = null; try { Configuration config = new Configuration(); dfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); dfsCluster.waitActive(); dfsCluster.restartNameNode(true); BlockManagerTestUtil.checkHeartbeat( dfsCluster.getNamesystem().getBlockManager()); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName mxbeanNameFsns = new ObjectName( "Hadoop:service=NameNode,name=FSNamesystemState"); Integer numStaleStorages = (Integer) (mbs.getAttribute( mxbeanNameFsns, "NumStaleStorages")); assertEquals(0, numStaleStorages.intValue()); } finally { if (dfsCluster != null) { dfsCluster.shutdown(); } } return; }
@Test public void testProcessErasureCodingTasksSubmitionShouldSucceed() throws Exception { DataNode dataNode = cluster.dataNodes.get(0).datanode; // Pack invalid(dummy) parameters in ecTasks. Irrespective of parameters, each task // thread pool submission should succeed, so that it will not prevent // processing other tasks in the list if any exceptions. int size = cluster.dataNodes.size(); byte[] liveIndices = new byte[size]; DatanodeInfo[] dataDNs = new DatanodeInfo[size + 1]; DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), new DatanodeStorage("s01")); DatanodeStorageInfo[] dnStorageInfo = new DatanodeStorageInfo[] { targetDnInfos_1 }; BlockECRecoveryInfo invalidECInfo = new BlockECRecoveryInfo( new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices, ErasureCodingPolicyManager.getSystemDefaultPolicy()); List<BlockECRecoveryInfo> ecTasks = new ArrayList<BlockECRecoveryInfo>(); ecTasks.add(invalidECInfo); dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks); }
/** * Verify the support for decommissioning a datanode that is already dead. * Under this scenario the datanode should immediately be marked as * DECOMMISSIONED */ @Test(timeout=120000) public void testDecommissionDeadDN() throws IOException, InterruptedException, TimeoutException { DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId(); String dnName = dnID.getXferAddr(); DataNodeProperties stoppedDN = cluster.stopDataNode(0); DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), false, 30000); FSNamesystem fsn = cluster.getNamesystem(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID); decommissionNode(fsn, localFileSys, dnName); dm.refreshNodes(conf); BlockManagerTestUtil.checkDecommissionState(dm, dnDescriptor); assertTrue(dnDescriptor.isDecommissioned()); // Add the node back cluster.restartDataNode(stoppedDN, true); cluster.waitActive(); // Call refreshNodes on FSNamesystem with empty exclude file to remove the // datanode from decommissioning list and make it available again. writeConfigFile(localFileSys, excludeFile, null); dm.refreshNodes(conf); }
/** * wait for datanode to reach alive or dead state for waitTime given in * milliseconds. */ private void waitForDatanodeState(String nodeID, boolean alive, int waitTime) throws TimeoutException, InterruptedException { long stopTime = Time.now() + waitTime; FSNamesystem namesystem = cluster.getNamesystem(); String state = alive ? "alive" : "dead"; while (Time.now() < stopTime) { final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode( namesystem, nodeID); if (dd.isAlive == alive) { LOG.info("datanode " + nodeID + " is " + state); return; } LOG.info("Waiting for datanode " + nodeID + " to become " + state); Thread.sleep(1000); } throw new TimeoutException("Timedout waiting for datanode reach state " + state); }
/** * wait for datanode to reach alive or dead state for waitTime given in * milliseconds. */ private void waitForDatanodeState(String nodeID, boolean alive, int waitTime) throws TimeoutException, InterruptedException { long stopTime = Time.now() + waitTime; FSNamesystem namesystem = cluster.getNamesystem(); String state = alive ? "alive" : "dead"; while (Time.now() < stopTime) { final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(namesystem, nodeID); if (dd.isAlive == alive) { LOG.info("datanode " + nodeID + " is " + state); return; } LOG.info("Waiting for datanode " + nodeID + " to become " + state); Thread.sleep(1000); } throw new TimeoutException( "Timedout waiting for datanode reach state " + state); }
@Override void generateInputs(int[] ignore) throws IOException { final FSNamesystem namesystem = nameNode.getNamesystem(); // start data-nodes; create a bunch of files; generate block reports. blockReportObject.generateInputs(ignore); // stop replication monitor BlockManagerTestUtil.getReplicationThread(namesystem.getBlockManager()) .interrupt(); try { BlockManagerTestUtil.getReplicationThread(namesystem.getBlockManager()) .join(); } catch (InterruptedException ei) { return; } // report blocks once int nrDatanodes = blockReportObject.getNumDatanodes(); for (int idx = 0; idx < nrDatanodes; idx++) { blockReportObject.executeOp(idx, 0, null); } // decommission data-nodes decommissionNodes(); // set node replication limit BlockManagerTestUtil.setNodeReplicationLimit(namesystem.getBlockManager(), nodeReplicationLimit); }
private void printStats() throws IOException { BlockManagerTestUtil.updateState(cluster.getNamesystem().getBlockManager()); if (LOG.isDebugEnabled()) { LOG.debug("Missing " + cluster.getNamesystem().getMissingBlocksCount()); LOG.debug( "Corrupted " + cluster.getNamesystem().getCorruptReplicaBlocks()); LOG.debug("Under-replicated " + cluster.getNamesystem(). getUnderReplicatedBlocks()); LOG.debug("Pending delete " + cluster.getNamesystem(). getPendingDeletionBlocks()); LOG.debug("Pending replications " + cluster.getNamesystem(). getPendingReplicationBlocks()); LOG.debug("Excess " + cluster.getNamesystem().getExcessBlocks()); LOG.debug("Total " + cluster.getNamesystem().getBlocksTotal()); } }