@Test public void testDFSAdminInvalidUsageHelp() { ImmutableSet<String> args = ImmutableSet.of("-report", "-saveNamespace", "-rollEdits", "-restoreFailedStorage", "-refreshNodes", "-finalizeUpgrade", "-metasave", "-refreshUserToGroupsMappings", "-printTopology", "-refreshNamenodes", "-deleteBlockPool", "-setBalancerBandwidth", "-fetchImage"); try { for (String arg : args) assertTrue(ToolRunner.run(new DFSAdmin(), fillArgs(arg)) == -1); assertTrue(ToolRunner.run(new DFSAdmin(), new String[] { "-help", "-some" }) == 0); } catch (Exception e) { fail("testDFSAdminHelp error" + e); } String pattern = "Usage: hdfs dfsadmin"; checkOutput(new String[] { "-cancel", "-renew" }, pattern, System.err, DFSAdmin.class); }
@Test public void testMultipleRegistration() throws Exception { RefreshRegistry.defaultRegistry().register("sharedId", firstHandler); RefreshRegistry.defaultRegistry().register("sharedId", secondHandler); // this should trigger both DFSAdmin admin = new DFSAdmin(config); String[] args = new String[]{"-refresh", "localhost:" + cluster.getNameNodePort(), "sharedId", "one"}; int exitCode = admin.run(args); assertEquals(-1, exitCode); // -1 because one of the responses is unregistered // verify we called both Mockito.verify(firstHandler).handleRefresh("sharedId", new String[]{"one"}); Mockito.verify(secondHandler).handleRefresh("sharedId", new String[]{"one"}); RefreshRegistry.defaultRegistry().unregisterAll("sharedId"); }
@Test public void testExceptionResultsInNormalError() throws Exception { // In this test, we ensure that all handlers are called even if we throw an exception in one RefreshHandler exceptionalHandler = Mockito.mock(RefreshHandler.class); Mockito.stub(exceptionalHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class))) .toThrow(new RuntimeException("Exceptional Handler Throws Exception")); RefreshHandler otherExceptionalHandler = Mockito.mock(RefreshHandler.class); Mockito.stub(otherExceptionalHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class))) .toThrow(new RuntimeException("More Exceptions")); RefreshRegistry.defaultRegistry().register("exceptional", exceptionalHandler); RefreshRegistry.defaultRegistry().register("exceptional", otherExceptionalHandler); DFSAdmin admin = new DFSAdmin(config); String[] args = new String[]{"-refresh", "localhost:" + cluster.getNameNodePort(), "exceptional"}; int exitCode = admin.run(args); assertEquals(-1, exitCode); // Exceptions result in a -1 Mockito.verify(exceptionalHandler).handleRefresh("exceptional", new String[]{}); Mockito.verify(otherExceptionalHandler).handleRefresh("exceptional", new String[]{}); RefreshRegistry.defaultRegistry().unregisterAll("exceptional"); }
@Test public void testRefresh() throws Exception { assertTrue("Mock queue should have been constructed", mockQueueConstructions > 0); assertTrue("Puts are routed through MockQueue", canPutInMockQueue()); int lastMockQueueConstructions = mockQueueConstructions; // Replace queue with the queue specified in core-site.xml, which would be the LinkedBlockingQueue DFSAdmin admin = new DFSAdmin(config); String [] args = new String[]{"-refreshCallQueue"}; int exitCode = admin.run(args); assertEquals("DFSAdmin should return 0", 0, exitCode); assertEquals("Mock queue should have no additional constructions", lastMockQueueConstructions, mockQueueConstructions); try { assertFalse("Puts are routed through LBQ instead of MockQueue", canPutInMockQueue()); } catch (IOException ioe){ fail("Could not put into queue at all"); } }
private void runGetBalancerBandwidthCmd(DFSAdmin admin, String[] args, ClientDatanodeProtocol proxy, long expectedBandwidth) throws Exception { PrintStream initialStdOut = System.out; outContent.reset(); try { System.setOut(outStream); int exitCode = admin.run(args); assertEquals("DFSAdmin should return 0", 0, exitCode); String bandwidthOutMsg = "Balancer bandwidth is " + expectedBandwidth + " bytes per second."; String strOut = new String(outContent.toByteArray(), UTF8); assertTrue("Wrong balancer bandwidth!", strOut.contains(bandwidthOutMsg)); } finally { System.setOut(initialStdOut); } }
@Test public void testSetSpaceQuotaWhenStorageTypeIsWrong() throws Exception { Configuration conf = new HdfsConfiguration(); conf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:8020"); DFSAdmin admin = new DFSAdmin(conf); ByteArrayOutputStream err = new ByteArrayOutputStream(); PrintStream oldErr = System.err; try { System.setErr(new PrintStream(err)); String[] args = { "-setSpaceQuota", "100", "-storageType", "COLD", "/testDir" }; admin.run(args); String errOutput = new String(err.toByteArray(), Charsets.UTF_8); assertTrue( errOutput.contains(StorageType.getTypesSupportingQuota().toString())); } finally { System.setErr(oldErr); } }
@Test public void testDFSAdminInvalidUsageHelp() { ImmutableSet<String> args = ImmutableSet.of("-report", "-saveNamespace", "-rollEdits", "-restoreFailedStorage", "-refreshNodes", "-finalizeUpgrade", "-metasave", "-refreshUserToGroupsMappings", "-printTopology", "-refreshNamenodes", "-deleteBlockPool", "-setBalancerBandwidth", "-fetchImage"); try { for (String arg : args) assertTrue(ToolRunner.run(new DFSAdmin(), fillArgs(arg)) == -1); assertTrue(ToolRunner.run(new DFSAdmin(), new String[] { "-help", "-some" }) == 0); } catch (Exception e) { fail("testDFSAdminHelp error" + e); } String pattern = "Usage: java DFSAdmin"; checkOutput(new String[] { "-cancel", "-renew" }, pattern, System.err, DFSAdmin.class); }
@Test public void testGroupMappingRefresh() throws Exception { DFSAdmin admin = new DFSAdmin(config); String [] args = new String[]{"-refreshUserToGroupsMappings"}; Groups groups = Groups.getUserToGroupsMappingService(config); String user = UserGroupInformation.getCurrentUser().getUserName(); System.out.println("first attempt:"); List<String> g1 = groups.getGroups(user); String [] str_groups = new String [g1.size()]; g1.toArray(str_groups); System.out.println(Arrays.toString(str_groups)); System.out.println("second attempt, should be same:"); List<String> g2 = groups.getGroups(user); g2.toArray(str_groups); System.out.println(Arrays.toString(str_groups)); for(int i=0; i<g2.size(); i++) { assertEquals("Should be same group ", g1.get(i), g2.get(i)); } admin.run(args); System.out.println("third attempt(after refresh command), should be different:"); List<String> g3 = groups.getGroups(user); g3.toArray(str_groups); System.out.println(Arrays.toString(str_groups)); for(int i=0; i<g3.size(); i++) { assertFalse("Should be different group: " + g1.get(i) + " and " + g3.get(i), g1.get(i).equals(g3.get(i))); } // test time out Thread.sleep(groupRefreshTimeoutSec*1100); System.out.println("fourth attempt(after timeout), should be different:"); List<String> g4 = groups.getGroups(user); g4.toArray(str_groups); System.out.println(Arrays.toString(str_groups)); for(int i=0; i<g4.size(); i++) { assertFalse("Should be different group ", g3.get(i).equals(g4.get(i))); } }
private void checkOutput(String[] args, String pattern, PrintStream out, Class<?> clazz) { ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); try { PipedOutputStream pipeOut = new PipedOutputStream(); PipedInputStream pipeIn = new PipedInputStream(pipeOut, PIPE_BUFFER_SIZE); if (out == System.out) { System.setOut(new PrintStream(pipeOut)); } else if (out == System.err) { System.setErr(new PrintStream(pipeOut)); } if (clazz == DelegationTokenFetcher.class) { expectDelegationTokenFetcherExit(args); } else if (clazz == JMXGet.class) { expectJMXGetExit(args); } else if (clazz == DFSAdmin.class) { expectDfsAdminPrint(args); } pipeOut.close(); ByteStreams.copy(pipeIn, outBytes); pipeIn.close(); assertTrue(new String(outBytes.toByteArray()).contains(pattern)); } catch (Exception ex) { fail("checkOutput error " + ex); } }
private void expectDfsAdminPrint(String[] args) { try { ToolRunner.run(new DFSAdmin(), args); } catch (Exception ex) { fail("expectDelegationTokenFetcherExit ex error " + ex); } }
@Test public void testInvalidCommand() throws Exception { DFSAdmin admin = new DFSAdmin(config); String [] args = new String[]{"-refresh", "nn"}; int exitCode = admin.run(args); assertEquals("DFSAdmin should fail due to bad args", -1, exitCode); }
@Test public void testInvalidIdentifier() throws Exception { DFSAdmin admin = new DFSAdmin(config); String [] args = new String[]{"-refresh", "localhost:" + cluster.getNameNodePort(), "unregisteredIdentity"}; int exitCode = admin.run(args); assertEquals("DFSAdmin should fail due to no handler registered", -1, exitCode); }
@Test public void testValidIdentifier() throws Exception { DFSAdmin admin = new DFSAdmin(config); String[] args = new String[]{"-refresh", "localhost:" + cluster.getNameNodePort(), "firstHandler"}; int exitCode = admin.run(args); assertEquals("DFSAdmin should succeed", 0, exitCode); Mockito.verify(firstHandler).handleRefresh("firstHandler", new String[]{}); // Second handler was never called Mockito.verify(secondHandler, Mockito.never()) .handleRefresh(Mockito.anyString(), Mockito.any(String[].class)); }
@Test public void testVariableArgs() throws Exception { DFSAdmin admin = new DFSAdmin(config); String[] args = new String[]{"-refresh", "localhost:" + cluster.getNameNodePort(), "secondHandler", "one"}; int exitCode = admin.run(args); assertEquals("DFSAdmin should return 2", 2, exitCode); exitCode = admin.run(new String[]{"-refresh", "localhost:" + cluster.getNameNodePort(), "secondHandler", "one", "two"}); assertEquals("DFSAdmin should now return 3", 3, exitCode); Mockito.verify(secondHandler).handleRefresh("secondHandler", new String[]{"one"}); Mockito.verify(secondHandler).handleRefresh("secondHandler", new String[]{"one", "two"}); }
@Test public void testUnregistration() throws Exception { RefreshRegistry.defaultRegistry().unregisterAll("firstHandler"); // And now this should fail DFSAdmin admin = new DFSAdmin(config); String[] args = new String[]{"-refresh", "localhost:" + cluster.getNameNodePort(), "firstHandler"}; int exitCode = admin.run(args); assertEquals("DFSAdmin should return -1", -1, exitCode); }
@Test public void testMultipleReturnCodeMerging() throws Exception { // Two handlers which return two non-zero values RefreshHandler handlerOne = Mockito.mock(RefreshHandler.class); Mockito.stub(handlerOne.handleRefresh(Mockito.anyString(), Mockito.any(String[].class))) .toReturn(new RefreshResponse(23, "Twenty Three")); RefreshHandler handlerTwo = Mockito.mock(RefreshHandler.class); Mockito.stub(handlerTwo.handleRefresh(Mockito.anyString(), Mockito.any(String[].class))) .toReturn(new RefreshResponse(10, "Ten")); // Then registered to the same ID RefreshRegistry.defaultRegistry().register("shared", handlerOne); RefreshRegistry.defaultRegistry().register("shared", handlerTwo); // We refresh both DFSAdmin admin = new DFSAdmin(config); String[] args = new String[]{"-refresh", "localhost:" + cluster.getNameNodePort(), "shared"}; int exitCode = admin.run(args); assertEquals(-1, exitCode); // We get -1 because of our logic for melding non-zero return codes // Verify we called both Mockito.verify(handlerOne).handleRefresh("shared", new String[]{}); Mockito.verify(handlerTwo).handleRefresh("shared", new String[]{}); RefreshRegistry.defaultRegistry().unregisterAll("shared"); }
/** * Test recovery on restart OOB message. It also tests the delivery of * OOB ack originating from the primary datanode. Since there is only * one node in the cluster, failure of restart-recovery will fail the * test. */ @Test public void testPipelineRecoveryOnOOB() throws Exception { Configuration conf = new HdfsConfiguration(); conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "15"); MiniDFSCluster cluster = null; try { int numDataNodes = 1; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster.waitActive(); FileSystem fileSys = cluster.getFileSystem(); Path file = new Path("dataprotocol2.dat"); DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L); DFSOutputStream out = (DFSOutputStream)(fileSys.append(file). getWrappedStream()); out.write(1); out.hflush(); DFSAdmin dfsadmin = new DFSAdmin(conf); DataNode dn = cluster.getDataNodes().get(0); final String dnAddr = dn.getDatanodeId().getIpcAddr(false); // issue shutdown to the datanode. final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args1)); // Wait long enough to receive an OOB ack before closing the file. Thread.sleep(4000); // Retart the datanode cluster.restartDataNode(0, true); // The following forces a data packet and end of block packets to be sent. out.close(); } finally { if (cluster != null) { cluster.shutdown(); } } }
/** * Finalize the namenode. Block pools corresponding to the namenode are * finalized on the datanode. */ private void finalizeNamenode(NameNode nn, Configuration conf) throws Exception { if (nn == null) { throw new IllegalStateException("Attempting to finalize " + "Namenode but it is not running"); } ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"}); }
/** * default setting is file:// which is not a DFS * so DFSAdmin should throw and catch InvalidArgumentException * and return -1 exit code. * @throws Exception */ @Test (timeout = 30000) public void testInvalidShell() throws Exception { Configuration conf = new Configuration(); // default FS (non-DFS) DFSAdmin admin = new DFSAdmin(); admin.setConf(conf); int res = admin.run(new String[] {"-refreshNodes"}); assertEquals("expected to fail -1", res , -1); }
public static void runCmd(DFSAdmin dfsadmin, boolean success, String... args) throws Exception { if (success) { Assert.assertEquals(0, dfsadmin.run(args)); } else { Assert.assertTrue(dfsadmin.run(args) != 0); } }
@Test public void testDFSAdminDatanodeUpgradeControlCommands() throws Exception { // start a cluster final Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); final DFSAdmin dfsadmin = new DFSAdmin(conf); DataNode dn = cluster.getDataNodes().get(0); // check the datanode final String dnAddr = dn.getDatanodeId().getIpcAddr(false); final String[] args1 = {"-getDatanodeInfo", dnAddr}; Assert.assertEquals(0, dfsadmin.run(args1)); // issue shutdown to the datanode. final String[] args2 = {"-shutdownDatanode", dnAddr, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args2)); // the datanode should be down. Thread.sleep(2000); Assert.assertFalse("DataNode should exit", dn.isDatanodeUp()); // ping should fail. Assert.assertEquals(-1, dfsadmin.run(args1)); } finally { if (cluster != null) cluster.shutdown(); } }
/** * Download a few fsimages using `hdfs dfsadmin -fetchImage ...' and verify * the results. */ @Test public void testFetchImage() throws Exception { FETCHED_IMAGE_FILE.mkdirs(); Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fs = null; try { DFSAdmin dfsAdmin = new DFSAdmin(); dfsAdmin.setConf(conf); runFetchImage(dfsAdmin, cluster); fs = cluster.getFileSystem(); fs.mkdirs(new Path("/foo")); fs.mkdirs(new Path("/foo2")); fs.mkdirs(new Path("/foo3")); cluster.getNameNodeRpc() .setSafeMode(SafeModeAction.SAFEMODE_ENTER, false); cluster.getNameNodeRpc().saveNamespace(); cluster.getNameNodeRpc() .setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false); runFetchImage(dfsAdmin, cluster); } finally { if (fs != null) { fs.close(); } if (cluster != null) { cluster.shutdown(); } } }
/** * Run `hdfs dfsadmin -fetchImage ...' and verify that the downloaded image is * correct. */ private static void runFetchImage(DFSAdmin dfsAdmin, MiniDFSCluster cluster) throws Exception { int retVal = dfsAdmin.run(new String[]{"-fetchImage", FETCHED_IMAGE_FILE.getPath() }); assertEquals(0, retVal); File highestImageOnNn = getHighestFsImageOnCluster(cluster); MD5Hash expected = MD5FileUtils.computeMd5ForFile(highestImageOnNn); MD5Hash actual = MD5FileUtils.computeMd5ForFile( new File(FETCHED_IMAGE_FILE, highestImageOnNn.getName())); assertEquals(expected, actual); }
private void startRollingUpgrade() throws Exception { LOG.info("Starting rolling upgrade"); fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); final DFSAdmin dfsadmin = new DFSAdmin(conf); TestRollingUpgrade.runCmd(dfsadmin, true, "-rollingUpgrade", "prepare"); triggerHeartBeats(); // Ensure datanode rolling upgrade is started assertTrue(dn0.getFSDataset().trashEnabled(blockPoolId)); }