private static void assertEpochFilesCopied(MiniQJMHACluster jnCluster) throws IOException { for (int i = 0; i < 3; i++) { File journalDir = jnCluster.getJournalCluster().getJournalDir(i, "ns1"); File currDir = new File(journalDir, "current"); File prevDir = new File(journalDir, "previous"); for (String fileName : new String[]{ Journal.LAST_PROMISED_FILENAME, Journal.LAST_WRITER_EPOCH }) { File prevFile = new File(prevDir, fileName); // Possible the prev file doesn't exist, e.g. if there has never been a // writer before the upgrade. if (prevFile.exists()) { PersistentLongFile prevLongFile = new PersistentLongFile(prevFile, -10); PersistentLongFile currLongFile = new PersistentLongFile(new File(currDir, fileName), -11); assertTrue("Value in " + fileName + " has decreased on upgrade in " + journalDir, prevLongFile.get() <= currLongFile.get()); } } } }
@Before public void setup() throws Exception { Configuration conf = new Configuration(); // Turn off IPC client caching, so that the suite can handle // the restart of the daemons between test cases. conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); MiniQJMHACluster miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build(); cluster = miniQjmHaCluster.getDfsCluster(); jCluster = miniQjmHaCluster.getJournalCluster(); // make nn0 active cluster.transitionToActive(0); // do sth to generate in-progress edit log data DistributedFileSystem dfs = (DistributedFileSystem) HATestUtil.configureFailoverFs(cluster, conf); dfs.mkdirs(new Path("/test2")); dfs.close(); }
private void setUpHaCluster(boolean security) throws Exception { conf = new Configuration(); conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, security); cluster = new MiniQJMHACluster.Builder(conf).build(); setHAConf(conf, cluster.getDfsCluster().getNameNode(0).getHostAndPort(), cluster.getDfsCluster().getNameNode(1).getHostAndPort()); cluster.getDfsCluster().getNameNode(0).getHostAndPort(); admin = new DFSAdmin(); admin.setConf(conf); assertTrue(HAUtil.isHAEnabled(conf, "ns1")); originOut = System.out; originErr = System.err; System.setOut(new PrintStream(out)); System.setErr(new PrintStream(err)); }
@Before public void setup() throws Exception { Configuration conf = new Configuration(); // Turn off IPC client caching, so that the suite can handle // the restart of the daemons between test cases. conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); MiniQJMHACluster miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build(); cluster = miniQjmHaCluster.getDfsCluster(); jCluster = miniQjmHaCluster.getJournalCluster(); // make nn0 active cluster.transitionToActive(0); // do sth to generate in-progress edit log data DistributedFileSystem dfs = (DistributedFileSystem) HATestUtil.configureFailoverFs(cluster, conf); dfs.mkdirs(new Path("/test2")); dfs.close(); }
@Before public void setup() throws Exception { Configuration conf = new Configuration(); // Turn off IPC client caching, so that the suite can handle // the restart of the daemons between test cases. conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build(); cluster = miniQjmHaCluster.getDfsCluster(); jCluster = miniQjmHaCluster.getJournalCluster(); // make nn0 active cluster.transitionToActive(0); // do sth to generate in-progress edit log data DistributedFileSystem dfs = (DistributedFileSystem) HATestUtil.configureFailoverFs(cluster, conf); dfs.mkdirs(new Path("/test2")); dfs.close(); }
@Test(timeout = 120000) public void testNNFailover() throws IOException, URISyntaxException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs (cluster.getDfsCluster(), conf)).dfs; DFSInotifyEventInputStream eis = client.getInotifyEventStream(); for (int i = 0; i < 10; i++) { client.mkdirs("/dir" + i, null, false); } cluster.getDfsCluster().shutdownNameNode(0); cluster.getDfsCluster().transitionToActive(1); EventBatch batch = null; // we can read all of the edits logged by the old active from the new // active for (int i = 0; i < 10; i++) { batch = waitForNextEvents(eis); Assert.assertEquals(1, batch.getEvents().length); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); } finally { cluster.shutdown(); } }
@Test(timeout = 120000) public void testReadEventsWithTimeout() throws IOException, InterruptedException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); final DFSClient client = new DFSClient(cluster.getDfsCluster() .getNameNode(0).getNameNodeAddress(), conf); DFSInotifyEventInputStream eis = client.getInotifyEventStream(); ScheduledExecutorService ex = Executors .newSingleThreadScheduledExecutor(); ex.schedule(new Runnable() { @Override public void run() { try { client.mkdirs("/dir", null, false); } catch (IOException e) { // test will fail LOG.error("Unable to create /dir", e); } } }, 1, TimeUnit.SECONDS); // a very generous wait period -- the edit will definitely have been // processed by the time this is up EventBatch batch = eis.poll(5, TimeUnit.SECONDS); Assert.assertNotNull(batch); Assert.assertEquals(1, batch.getEvents().length); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath()); } finally { cluster.shutdown(); } }
@Test (timeout = 300000) public void testQuery() throws Exception { final Configuration conf = new Configuration(); MiniQJMHACluster cluster = null; try { cluster = new MiniQJMHACluster.Builder(conf).build(); MiniDFSCluster dfsCluster = cluster.getDfsCluster(); dfsCluster.waitActive(); dfsCluster.transitionToActive(0); DistributedFileSystem dfs = dfsCluster.getFileSystem(0); dfsCluster.shutdownNameNode(1); // start rolling upgrade RollingUpgradeInfo info = dfs .rollingUpgrade(RollingUpgradeAction.PREPARE); Assert.assertTrue(info.isStarted()); info = dfs.rollingUpgrade(RollingUpgradeAction.QUERY); Assert.assertFalse(info.createdRollbackImages()); dfsCluster.restartNameNode(1); queryForPreparation(dfs); // The NN should have a copy of the fsimage in case of rollbacks. Assert.assertTrue(dfsCluster.getNamesystem(0).getFSImage() .hasRollbackFSImage()); } finally { if (cluster != null) { cluster.shutdown(); } } }
@Before public void setUpCluster() throws Exception { conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 10); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); HAUtil.setAllowStandbyReads(conf, true); if (clusterType == TestType.SHARED_DIR_HA) { MiniDFSNNTopology topology = MiniQJMHACluster.createDefaultTopology(10000); cluster = new MiniDFSCluster.Builder(conf) .nnTopology(topology) .numDataNodes(0) .checkExitOnShutdown(false) .build(); } else { Builder builder = new MiniQJMHACluster.Builder(conf); builder.getDfsBuilder().numDataNodes(0).checkExitOnShutdown(false); miniQjmHaCluster = builder.build(); cluster = miniQjmHaCluster.getDfsCluster(); } cluster.waitActive(); nn0 = cluster.getNameNode(0); nn1 = cluster.getNameNode(1); cluster.transitionToActive(0); fs = HATestUtil.configureFailoverFs(cluster, conf); }
private static void checkJnPreviousDirExistence(MiniQJMHACluster jnCluster, boolean shouldExist) throws IOException { for (int i = 0; i < 3; i++) { checkPreviousDirExistence( jnCluster.getJournalCluster().getJournalDir(i, "ns1"), shouldExist); } if (shouldExist) { assertEpochFilesCopied(jnCluster); } }
private long getCommittedTxnIdValue(MiniQJMHACluster qjCluster) throws IOException { Journal journal1 = qjCluster.getJournalCluster().getJournalNode(0) .getOrCreateJournal(MiniQJMHACluster.NAMESERVICE); BestEffortLongFile committedTxnId = (BestEffortLongFile) Whitebox .getInternalState(journal1, "committedTxnId"); return committedTxnId != null ? committedTxnId.get() : HdfsConstants.INVALID_TXID; }
private void testQuery(int nnCount) throws Exception{ final Configuration conf = new Configuration(); MiniQJMHACluster cluster = null; try { cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build(); MiniDFSCluster dfsCluster = cluster.getDfsCluster(); dfsCluster.waitActive(); dfsCluster.transitionToActive(0); DistributedFileSystem dfs = dfsCluster.getFileSystem(0); // shutdown other NNs for (int i = 1; i < nnCount; i++) { dfsCluster.shutdownNameNode(i); } // start rolling upgrade RollingUpgradeInfo info = dfs .rollingUpgrade(RollingUpgradeAction.PREPARE); Assert.assertTrue(info.isStarted()); info = dfs.rollingUpgrade(RollingUpgradeAction.QUERY); Assert.assertFalse(info.createdRollbackImages()); // restart other NNs for (int i = 1; i < nnCount; i++) { dfsCluster.restartNameNode(i); } // check that one of the other NNs has created the rollback image and uploaded it queryForPreparation(dfs); // The NN should have a copy of the fsimage in case of rollbacks. Assert.assertTrue(dfsCluster.getNamesystem(0).getFSImage() .hasRollbackFSImage()); } finally { if (cluster != null) { cluster.shutdown(); } } }
public void testCheckpoint(int nnCount) throws IOException, InterruptedException { final Configuration conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1); MiniQJMHACluster cluster = null; final Path foo = new Path("/foo"); try { cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build(); MiniDFSCluster dfsCluster = cluster.getDfsCluster(); dfsCluster.waitActive(); dfsCluster.transitionToActive(0); DistributedFileSystem dfs = dfsCluster.getFileSystem(0); // start rolling upgrade RollingUpgradeInfo info = dfs .rollingUpgrade(RollingUpgradeAction.PREPARE); Assert.assertTrue(info.isStarted()); queryForPreparation(dfs); dfs.mkdirs(foo); long txid = dfs.rollEdits(); Assert.assertTrue(txid > 0); for(int i=1; i< nnCount; i++) { verifyNNCheckpoint(dfsCluster, txid, i); } } finally { if (cluster != null) { cluster.shutdown(); } } }
private long getCommittedTxnIdValue(MiniQJMHACluster qjCluster) throws IOException { Journal journal1 = qjCluster.getJournalCluster().getJournalNode(0) .getOrCreateJournal(MiniQJMHACluster.NAMESERVICE); BestEffortLongFile committedTxnId = (BestEffortLongFile) Whitebox .getInternalState(journal1, "committedTxnId"); return committedTxnId != null ? committedTxnId.get() : HdfsServerConstants.INVALID_TXID; }
private void setUpHaCluster(boolean security) throws Exception { conf = new Configuration(); conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, security); cluster = new MiniQJMHACluster.Builder(conf).build(); setHAConf(conf, cluster.getDfsCluster().getNameNode(0).getHostAndPort(), cluster.getDfsCluster().getNameNode(1).getHostAndPort()); cluster.getDfsCluster().getNameNode(0).getHostAndPort(); admin = new DFSAdmin(); admin.setConf(conf); assertTrue(HAUtil.isHAEnabled(conf, "ns1")); System.setOut(new PrintStream(out)); System.setErr(new PrintStream(err)); }