@Test(timeout = 600000) public void testDatanodeRUwithRegularUpgrade() throws Exception { try { startCluster(); rollingUpgradeAndFinalize(); DataNodeProperties dn = cluster.stopDataNode(0); cluster.restartNameNode(0, true, "-upgrade"); cluster.restartDataNode(dn, true); cluster.waitActive(); fs = cluster.getFileSystem(0); Path testFile3 = new Path("/" + GenericTestUtils.getMethodName() + ".03.dat"); DFSTestUtil.createFile(fs, testFile3, FILE_SIZE, REPL_FACTOR, SEED); cluster.getFileSystem().finalizeUpgrade(); } finally { shutdownCluster(); } }
/** * Verify the support for decommissioning a datanode that is already dead. * Under this scenario the datanode should immediately be marked as * DECOMMISSIONED */ @Test(timeout=120000) public void testDecommissionDeadDN() throws IOException, InterruptedException, TimeoutException { DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId(); String dnName = dnID.getXferAddr(); DataNodeProperties stoppedDN = cluster.stopDataNode(0); DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), false, 30000); FSNamesystem fsn = cluster.getNamesystem(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID); decommissionNode(fsn, localFileSys, dnName); dm.refreshNodes(conf); BlockManagerTestUtil.checkDecommissionState(dm, dnDescriptor); assertTrue(dnDescriptor.isDecommissioned()); // Add the node back cluster.restartDataNode(stoppedDN, true); cluster.waitActive(); // Call refreshNodes on FSNamesystem with empty exclude file to remove the // datanode from decommissioning list and make it available again. writeConfigFile(localFileSys, excludeFile, null); dm.refreshNodes(conf); }
@Test(timeout = 180000) public void testLogRollOnDatanodeDeath() throws IOException, InterruptedException { dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 3, true, null, null); tableName = getName(); Table table = createTestTable(tableName); TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); doPut(table, 1); server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); RegionInfo hri = server.getRegions(table.getName()).get(0).getRegionInfo(); AsyncFSWAL wal = (AsyncFSWAL) server.getWAL(hri); int numRolledLogFiles = AsyncFSWALProvider.getNumRolledLogFiles(wal); DatanodeInfo[] dnInfos = wal.getPipeline(); DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(dnInfos[0].getName()); TEST_UTIL.getDFSCluster().restartDataNode(dnProp); doPut(table, 2); assertEquals(numRolledLogFiles + 1, AsyncFSWALProvider.getNumRolledLogFiles(wal)); }
@Test public void testConnectToDatanodeFailed() throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InterruptedException, NoSuchFieldException { Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); xceiverServerDaemonField.setAccessible(true); Class<?> xceiverServerClass = Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); numPeersMethod.setAccessible(true); // make one datanode broken DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(0); Path f = new Path("/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); } finally { TEST_UTIL.getDFSCluster().restartDataNode(dnProp); } }
private static void rollbackRollingUpgrade(Path foo, Path bar, Path file, byte[] data, MiniDFSCluster cluster) throws IOException { final DataNodeProperties dnprop = cluster.stopDataNode(0); cluster.restartNameNode("-rollingUpgrade", "rollback"); cluster.restartDataNode(dnprop, true); final DistributedFileSystem dfs = cluster.getFileSystem(); Assert.assertTrue(dfs.exists(foo)); Assert.assertFalse(dfs.exists(bar)); AppendTestUtil.checkFullFile(dfs, file, data.length, data); }
private static boolean wipeAndRestartDn(MiniDFSCluster cluster, int dnIndex) throws IOException { // stop the DN, reformat it, then start it again with the same xfer port. DataNodeProperties dnProps = cluster.stopDataNode(dnIndex); cluster.formatDataNodeDirs(); return cluster.restartDataNode(dnProps, true); }
/** * The corrupt block has to be removed when the number of valid replicas * matches replication factor for the file. In this test, the above * condition is achieved by increasing the number of good replicas by * replicating on a new Datanode. * The test strategy : * Bring up Cluster with 3 DataNodes * Create a file of replication factor 3 * Corrupt one replica of a block of the file * Verify that there are still 2 good replicas and 1 corrupt replica * (corrupt replica should not be removed since number of good replicas * (2) is less than replication factor (3)) * Start a new data node * Verify that the a new replica is created and corrupt replica is * removed. * */ @Test public void testByAddingAnExtraDataNode() throws Exception { Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2)); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); FileSystem fs = cluster.getFileSystem(); final FSNamesystem namesystem = cluster.getNamesystem(); DataNodeProperties dnPropsFourth = cluster.stopDataNode(3); try { final Path fileName = new Path("/foo1"); DFSTestUtil.createFile(fs, fileName, 2, (short) 3, 0L); DFSTestUtil.waitReplication(fs, fileName, (short) 3); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName); corruptBlock(cluster, fs, fileName, 0, block); DFSTestUtil.waitReplication(fs, fileName, (short) 2); assertEquals(2, countReplicas(namesystem, block).liveReplicas()); assertEquals(1, countReplicas(namesystem, block).corruptReplicas()); cluster.restartDataNode(dnPropsFourth); DFSTestUtil.waitReplication(fs, fileName, (short) 3); assertEquals(3, countReplicas(namesystem, block).liveReplicas()); assertEquals(0, countReplicas(namesystem, block).corruptReplicas()); } finally { cluster.shutdown(); } }
private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName, int dnIndex, ExtendedBlock block) throws IOException { // corrupt the block on datanode dnIndex // the indexes change once the nodes are restarted. // But the datadirectory will not change assertTrue(cluster.corruptReplica(dnIndex, block)); DataNodeProperties dnProps = cluster.stopDataNode(0); // Each datanode has multiple data dirs, check each for (int dirIndex = 0; dirIndex < 2; dirIndex++) { final String bpid = cluster.getNamesystem().getBlockPoolId(); File storageDir = cluster.getStorageDir(dnIndex, dirIndex); File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); File scanLogFile = new File(dataDir, "dncp_block_verification.log.curr"); if (scanLogFile.exists()) { // wait for one minute for deletion to succeed; for (int i = 0; !scanLogFile.delete(); i++) { assertTrue("Could not delete log file in one minute", i < 60); try { Thread.sleep(1000); } catch (InterruptedException ignored) { } } } } // restart the detained so the corrupt replica will be detected cluster.restartDataNode(dnProps); }
/** * Verify the support for decommissioning a datanode that is already dead. * Under this scenario the datanode should immediately be marked as * DECOMMISSIONED */ @Test(timeout=120000) public void testDecommissionDeadDN() throws Exception { Logger log = Logger.getLogger(DecommissionManager.class); log.setLevel(Level.DEBUG); DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId(); String dnName = dnID.getXferAddr(); DataNodeProperties stoppedDN = cluster.stopDataNode(0); DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), false, 30000); FSNamesystem fsn = cluster.getNamesystem(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID); decommissionNode(fsn, localFileSys, dnName); dm.refreshNodes(conf); BlockManagerTestUtil.recheckDecommissionState(dm); assertTrue(dnDescriptor.isDecommissioned()); // Add the node back cluster.restartDataNode(stoppedDN, true); cluster.waitActive(); // Call refreshNodes on FSNamesystem with empty exclude file to remove the // datanode from decommissioning list and make it available again. writeConfigFile(localFileSys, excludeFile, null); dm.refreshNodes(conf); }
/** * Test for the case where the client beings to read a long block, but doesn't * read bytes off the stream quickly. The datanode should time out sending the * chunks and the transceiver should die, even if it has a long keepalive. */ @Test(timeout=300000) public void testSlowReader() throws Exception { // Set a client socket cache expiry time much longer than // the datanode-side expiration time. final long CLIENT_EXPIRY_MS = 600000L; Configuration clientConf = new Configuration(conf); clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS); clientConf.set(DFS_CLIENT_CONTEXT, "testSlowReader"); DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(), clientConf); // Restart the DN with a shorter write timeout. DataNodeProperties props = cluster.stopDataNode(0); props.conf.setInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, WRITE_TIMEOUT); props.conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 120000); assertTrue(cluster.restartDataNode(props, true)); dn = cluster.getDataNodes().get(0); // Wait for heartbeats to avoid a startup race where we // try to write the block while the DN is still starting. cluster.triggerHeartbeats(); DFSTestUtil.createFile(fs, TEST_FILE, 1024*1024*8L, (short)1, 0L); FSDataInputStream stm = fs.open(TEST_FILE); stm.read(); assertXceiverCount(1); GenericTestUtils.waitFor(new Supplier<Boolean>() { public Boolean get() { // DN should time out in sendChunks, and this should force // the xceiver to exit. return getXceiverCountWithoutServer() == 0; } }, 500, 50000); IOUtils.closeStream(stm); }
private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName, int dnIndex, ExtendedBlock block) throws IOException { // Truncate the block on the first datanode that has not been corrupted, // so that directory scanner can discover the corruption from file size // change. // the indexes change once the nodes are restarted. // But the datadirectory will not change cluster.getMaterializedReplica(0, block).truncateData(10); // Run directory scanner to update the DN's volume map DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0)); DataNodeProperties dnProps = cluster.stopDataNode(0); // Each datanode has multiple data dirs, check each for (int dirIndex = 0; dirIndex < 2; dirIndex++) { final String bpid = cluster.getNamesystem().getBlockPoolId(); File storageDir = cluster.getStorageDir(dnIndex, dirIndex); File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); File scanLogFile = new File(dataDir, "dncp_block_verification.log.curr"); if (scanLogFile.exists()) { // wait for one minute for deletion to succeed; for (int i = 0; !scanLogFile.delete(); i++) { assertTrue("Could not delete log file in one minute", i < 60); try { Thread.sleep(1000); } catch (InterruptedException ignored) { } } } } // restart the detained so the corrupt replica will be detected cluster.restartDataNode(dnProps); }
private void rollbackRollingUpgrade() throws Exception { // Shutdown datanodes and namenodes // Restart the namenode with rolling upgrade rollback LOG.info("Starting rollback of the rolling upgrade"); MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0); dnprop.setDnArgs("-rollback"); cluster.shutdownNameNodes(); cluster.restartNameNode("-rollingupgrade", "rollback"); cluster.restartDataNode(dnprop); cluster.waitActive(); nn = cluster.getNameNode(0); dn0 = cluster.getDataNodes().get(0); triggerHeartBeats(); LOG.info("The cluster is active after rollback"); }
private void runDNRestartCorruptType(CorruptionType corrupt) throws Exception { cluster = new MiniDFSCluster(conf, 3, true, null); FileSystem fs1 = cluster.getFileSystem(); try { short rep = 3; // replication assertTrue(BLOCK_SIZE%4 == 0); file1 = new Path("/dnDeath.dat"); // write 1/2 block & close stm = fs1.create(file1, true, 1024, rep, 4096); AppendTestUtil.write(stm, 0, 1024); stm.sync(); loseLeases(fs1); DFSOutputStream dfso = (DFSOutputStream)stm.getWrappedStream(); dfso.abortForTests(); // close the primary DN DataNodeProperties badDN = cluster.stopDataNode(0); // Truncate the block on the primary DN corruptDataNode(0, corrupt); // Start the DN back up cluster.restartDataNode(badDN); // Recover the lease FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf()); recoverFile(fs2); assertFileSize(fs2, 1024); checkFile(fs2, 1024); } finally { // explicitly do not shut down fs1, since it's been frozen up by // killing the DataStreamer and not allowing recovery cluster.shutdown(); } }
/** * Test that the restart of a DN and the subsequent pipeline recovery do not cause * a file to become prematurely considered "complete", when it's a fresh file * with no .append() called. */ public void testNotPrematurelyCompleteWithFailureNotReopened() throws Exception { LOG.info("START"); cluster = new MiniDFSCluster(conf, 3, true, null); NameNode nn = cluster.getNameNode(); FileSystem fs1 = cluster.getFileSystem(); try { short rep = 3; // replication file1 = new Path("/delayedReceiveBlock"); stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, 64*1024*1024); LOG.info("======== Writing"); AppendTestUtil.write(stm, 0, 1024*1024); LOG.info("======== Waiting for a block allocation"); waitForBlockReplication(fs1, "/delayedReceiveBlock", 0, 3000); LOG.info("======== Checking not complete"); assertFalse(NameNodeAdapter.checkFileProgress(nn.namesystem, "/delayedReceiveBlock", true)); // Stop one of the DNs, don't restart MiniDFSCluster.DataNodeProperties dnprops = cluster.stopDataNode(0); // Write some more data AppendTestUtil.write(stm, 0, 1024*1024); // Make sure we don't see the file as complete LOG.info("======== Checking progress"); assertFalse(NameNodeAdapter.checkFileProgress(nn.namesystem, "/delayedReceiveBlock", true)); LOG.info("======== Closing"); stm.close(); } finally { LOG.info("======== Cleaning up"); fs1.close(); cluster.shutdown(); } }
private DataNodeProperties shutdownDataNode(MiniDFSCluster cluster, DatanodeDescriptor datanode) { LOG.info("shutdown datanode: " + datanode.getName()); DataNodeProperties dnprop = cluster.stopDataNode(datanode.getName()); FSNamesystem namesystem = cluster.getNameNode().namesystem; // make sure that NN detects that the datanode is down synchronized (namesystem.heartbeats) { datanode.setLastUpdate(0); // mark it dead namesystem.heartbeatCheck(); } return dnprop; }
public DataNodeProperties shutdownDataNode(MiniDFSCluster cluster, DatanodeDescriptor datanode) { LOG.info("shutdown datanode: " + datanode.getName()); DataNodeProperties dnprop = cluster.stopDataNode(datanode.getName()); FSNamesystem namesystem = cluster.getNameNode().namesystem; // make sure that NN detects that the datanode is down synchronized (namesystem.heartbeats) { datanode.setLastUpdate(0); // mark it dead namesystem.heartbeatCheck(); } return dnprop; }
@Test(timeout = 5 * 60 * 1000) public void testAndReportDeletionPolicy() throws IOException, InterruptedException { int nodeStop = 0; DataNodeProperties dataNode = cluster.stopDataNode(nodeStop); waitForDataNodeToDie(nodeStop); String situation1 = getSituation("DataNode stopped."); cluster.restartDataNode(dataNode); waitForDataNodeToBack(nodeStop); String situation2 = getSituation("DataNode restored."); LOG.info("Test Report."); LOG.info("Favor Nodes: " + Arrays.toString(favoredHosts)); LOG.info("Node Stoped: " + favoredHosts[nodeStop]); LOG.info(situation1); LOG.info(situation2); LOG.info("LOG DONE!"); for (String hFile : hFiles) { if (!hFile.startsWith(TARGET_REGION_PREFIX)) { continue; } List<LocatedBlock> lbs = nameNode.getBlockLocations(hFile, 0, FILE_LENGTH).getLocatedBlocks(); for (LocatedBlock lb : lbs) { assertEquals("Not correctly replicated.", REPLICATION, lb.getLocations().length); for (DatanodeInfo dnInfo : lb.getLocations()) { assertTrue("A node not part of favored nodes discovered.", Arrays.asList(favoredHosts) .contains(dnInfo.getName())); } } } }
private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName, int dnIndex, ExtendedBlock block) throws IOException { // corrupt the block on datanode dnIndex // the indexes change once the nodes are restarted. // But the datadirectory will not change assertTrue(MiniDFSCluster.corruptReplica(dnIndex, block)); DataNodeProperties dnProps = cluster.stopDataNode(0); // Each datanode has multiple data dirs, check each for (int dirIndex = 0; dirIndex < 2; dirIndex++) { final String bpid = cluster.getNamesystem().getBlockPoolId(); File storageDir = MiniDFSCluster.getStorageDir(dnIndex, dirIndex); File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); File scanLogFile = new File(dataDir, "dncp_block_verification.log.curr"); if (scanLogFile.exists()) { // wait for one minute for deletion to succeed; for (int i = 0; !scanLogFile.delete(); i++) { assertTrue("Could not delete log file in one minute", i < 60); try { Thread.sleep(1000); } catch (InterruptedException ignored) { } } } } // restart the detained so the corrupt replica will be detected cluster.restartDataNode(dnProps); }
/** * Test for the case where the client beings to read a long block, but doesn't * read bytes off the stream quickly. The datanode should time out sending the * chunks and the transceiver should die, even if it has a long keepalive. */ @Test(timeout=30000) public void testSlowReader() throws Exception { // Restart the DN with a shorter write timeout. DataNodeProperties props = cluster.stopDataNode(0); props.conf.setInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, WRITE_TIMEOUT); props.conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 120000); assertTrue(cluster.restartDataNode(props, true)); // Wait for heartbeats to avoid a startup race where we // try to write the block while the DN is still starting. cluster.triggerHeartbeats(); dn = cluster.getDataNodes().get(0); DFSTestUtil.createFile(fs, TEST_FILE, 1024*1024*8L, (short)1, 0L); FSDataInputStream stm = fs.open(TEST_FILE); try { stm.read(); assertXceiverCount(1); // Poll for 0 running xceivers. Allow up to 5 seconds for some slack. long totalSleepTime = 0; long sleepTime = WRITE_TIMEOUT + 100; while (getXceiverCountWithoutServer() > 0 && totalSleepTime < 5000) { Thread.sleep(sleepTime); totalSleepTime += sleepTime; sleepTime = 100; } // DN should time out in sendChunks, and this should force // the xceiver to exit. assertXceiverCount(0); } finally { IOUtils.closeStream(stm); } }