/** * Test for the case when the SBN is configured to checkpoint based * on a time period, but no transactions are happening on the * active. Thus, it would want to save a second checkpoint at the * same txid, which is a no-op. This test makes sure this doesn't * cause any problem. */ @Test public void testCheckpointWhenNoNewTransactionsHappened() throws Exception { // Checkpoint as fast as we can, in a tight loop. cluster.getConfiguration(1).setInt( DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0); cluster.restartNameNode(1); nn1 = cluster.getNameNode(1); FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1); // We shouldn't save any checkpoints at txid=0 Thread.sleep(1000); Mockito.verify(spyImage1, Mockito.never()) .saveNamespace((FSNamesystem) Mockito.anyObject()); // Roll the primary and wait for the standby to catch up HATestUtil.waitForStandbyToCatchUp(nn0, nn1); Thread.sleep(2000); // We should make exactly one checkpoint at this new txid. Mockito.verify(spyImage1, Mockito.times(1)) .saveNamespace((FSNamesystem) Mockito.anyObject(), (Canceler)Mockito.anyObject()); }
/** * Test for the case when the SBN is configured to checkpoint based * on a time period, but no transactions are happening on the * active. Thus, it would want to save a second checkpoint at the * same txid, which is a no-op. This test makes sure this doesn't * cause any problem. */ @Test public void testCheckpointWhenNoNewTransactionsHappened() throws Exception { // Checkpoint as fast as we can, in a tight loop. cluster.getConfiguration(1).setInt( DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0); cluster.restartNameNode(1); nn1 = cluster.getNameNode(1); FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1); // We shouldn't save any checkpoints at txid=0 Thread.sleep(1000); Mockito.verify(spyImage1, Mockito.never()) .saveNamespace((FSNamesystem) Mockito.anyObject()); // Roll the primary and wait for the standby to catch up HATestUtil.waitForStandbyToCatchUp(nn0, nn1); Thread.sleep(2000); // We should make exactly one checkpoint at this new txid. Mockito.verify(spyImage1, Mockito.times(1)).saveNamespace( (FSNamesystem) Mockito.anyObject(), Mockito.eq(NameNodeFile.IMAGE), (Canceler) Mockito.anyObject()); }
private void checkImages(FSNamesystem fsn) throws Exception { Iterator<StorageDirectory> iter = fsn. getFSImage().dirIterator(FSImage.NameNodeDirType.IMAGE); List<Long> checksums = new ArrayList<Long>(); while (iter.hasNext()) { StorageDirectory sd = iter.next(); File fsImage = FSImage.getImageFile(sd, FSImage.NameNodeFile.IMAGE); PureJavaCrc32 crc = new PureJavaCrc32(); FileInputStream in = new FileInputStream(fsImage); byte[] buff = new byte[4096]; int read = 0; while ((read = in.read(buff)) != -1) { crc.update(buff, 0, read); } long val = crc.getValue(); checksums.add(val); } assertTrue("Not enough fsimage copies in MiniDFSCluster " + "to test parallel write", checksums.size() > 1); for (int i = 1; i < checksums.size(); i++) { assertEquals(checksums.get(i - 1), checksums.get(i)); } }
private boolean checkLogsAvailableForRead(FSImage image, long imageTxId, long curTxIdOnOtherNode) { if (imageTxId == curTxIdOnOtherNode) { // The other node hasn't written any logs since the last checkpoint. // This can be the case if the NN was freshly formatted as HA, and // then started in standby mode, so it has no edit logs at all. return true; } long firstTxIdInLogs = imageTxId + 1; assert curTxIdOnOtherNode >= firstTxIdInLogs : "first=" + firstTxIdInLogs + " onOtherNode=" + curTxIdOnOtherNode; try { Collection<EditLogInputStream> streams = image.getEditLog().selectInputStreams( firstTxIdInLogs, curTxIdOnOtherNode, null, true); for (EditLogInputStream stream : streams) { IOUtils.closeStream(stream); } return true; } catch (IOException e) { String msg = "Unable to read transaction ids " + firstTxIdInLogs + "-" + curTxIdOnOtherNode + " from the configured shared edits storage " + Joiner.on(",").join(sharedEditsUris) + ". " + "Please copy these logs into the shared edits storage " + "or call saveNamespace on the active node.\n" + "Error: " + e.getLocalizedMessage(); if (LOG.isDebugEnabled()) { LOG.fatal(msg, e); } else { LOG.fatal(msg); } return false; } }
private boolean checkLogsAvailableForRead(FSImage image, long imageTxId, long curTxIdOnOtherNode) { if (imageTxId == curTxIdOnOtherNode) { // The other node hasn't written any logs since the last checkpoint. // This can be the case if the NN was freshly formatted as HA, and // then started in standby mode, so it has no edit logs at all. return true; } long firstTxIdInLogs = imageTxId + 1; assert curTxIdOnOtherNode >= firstTxIdInLogs : "first=" + firstTxIdInLogs + " onOtherNode=" + curTxIdOnOtherNode; try { Collection<EditLogInputStream> streams = image.getEditLog().selectInputStreams( firstTxIdInLogs, curTxIdOnOtherNode, null, true); for (EditLogInputStream stream : streams) { IOUtils.closeStream(stream); } return true; } catch (IOException e) { String msg = "Unable to read transaction ids " + firstTxIdInLogs + "-" + curTxIdOnOtherNode + " from the configured shared edits storage " + Joiner.on(",").join(sharedEditsUris) + ". " + "Please copy these logs into the shared edits storage " + "or call saveNamespace on the active node.\n" + "Error: " + e.getLocalizedMessage(); if (LOG.isDebugEnabled()) { LOG.debug(msg, e); } else { LOG.fatal(msg); } return false; } }
CheckpointSignature(FSImage fsImage) { super(fsImage.getStorage()); blockpoolID = fsImage.getBlockPoolID(); mostRecentCheckpointTxId = fsImage.getStorage().getMostRecentCheckpointTxId(); curSegmentTxId = fsImage.getEditLog().getCurSegmentTxId(); }
void validateStorageInfo(FSImage si) throws IOException { if (!isSameCluster(si) || !storageVersionMatches(si.getStorage())) { throw new IOException("Inconsistent checkpoint fields.\n" + "LV = " + layoutVersion + " namespaceID = " + namespaceID + " cTime = " + cTime + " ; clusterId = " + clusterID + " ; blockpoolId = " + blockpoolID + ".\nExpecting respectively: " + si.getStorage().layoutVersion + "; " + si.getStorage().namespaceID + "; " + si.getStorage().cTime + "; " + si.getClusterID() + "; " + si.getBlockPoolID() + "."); } }
/** * Make sure that clients will receive StandbyExceptions even when a * checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer * thread will have FSNS lock. Regression test for HDFS-4591. */ @Test(timeout=300000) public void testStandbyExceptionThrownDuringCheckpoint() throws Exception { // Set it up so that we know when the SBN checkpoint starts and ends. FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1); DelayAnswer answerer = new DelayAnswer(LOG); Mockito.doAnswer(answerer).when(spyImage1) .saveNamespace(Mockito.any(FSNamesystem.class), Mockito.any(Canceler.class)); // Perform some edits and wait for a checkpoint to start on the SBN. doEdits(0, 1000); nn0.getRpcServer().rollEditLog(); answerer.waitForCall(); answerer.proceed(); assertTrue("SBN is not performing checkpoint but it should be.", answerer.getFireCount() == 1 && answerer.getResultCount() == 0); // Make sure that the lock has actually been taken by the checkpointing // thread. ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); try { // Perform an RPC to the SBN and make sure it throws a StandbyException. nn1.getRpcServer().getFileInfo("/"); fail("Should have thrown StandbyException, but instead succeeded."); } catch (StandbyException se) { GenericTestUtils.assertExceptionContains("is not supported", se); } // Make sure that the checkpoint is still going on, implying that the client // RPC to the SBN happened during the checkpoint. assertTrue("SBN should have still been checkpointing.", answerer.getFireCount() == 1 && answerer.getResultCount() == 0); answerer.waitForResult(); assertTrue("SBN should have finished checkpointing.", answerer.getFireCount() == 1 && answerer.getResultCount() == 1); }
private boolean checkLogsAvailableForRead(FSImage image, long imageTxId, long curTxIdOnOtherNode) { if (imageTxId == curTxIdOnOtherNode) { // The other node hasn't written any logs since the last checkpoint. // This can be the case if the NN was freshly formatted as HA, and // then started in standby mode, so it has no edit logs at all. return true; } long firstTxIdInLogs = imageTxId + 1; assert curTxIdOnOtherNode >= firstTxIdInLogs : "first=" + firstTxIdInLogs + " onOtherNode=" + curTxIdOnOtherNode; try { Collection<EditLogInputStream> streams = image.getEditLog().selectInputStreams( firstTxIdInLogs, curTxIdOnOtherNode, null, true, false); for (EditLogInputStream stream : streams) { IOUtils.closeStream(stream); } return true; } catch (IOException e) { String msg = "Unable to read transaction ids " + firstTxIdInLogs + "-" + curTxIdOnOtherNode + " from the configured shared edits storage " + Joiner.on(",").join(sharedEditsUris) + ". " + "Please copy these logs into the shared edits storage " + "or call saveNamespace on the active node.\n" + "Error: " + e.getLocalizedMessage(); if (LOG.isDebugEnabled()) { LOG.fatal(msg, e); } else { LOG.fatal(msg); } return false; } }
/** * Make sure that clients will receive StandbyExceptions even when a * checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer * thread will have FSNS lock. Regression test for HDFS-4591. */ @Test(timeout=300000) public void testStandbyExceptionThrownDuringCheckpoint() throws Exception { // Set it up so that we know when the SBN checkpoint starts and ends. FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1); DelayAnswer answerer = new DelayAnswer(LOG); Mockito.doAnswer(answerer).when(spyImage1) .saveNamespace(Mockito.any(FSNamesystem.class), Mockito.eq(NameNodeFile.IMAGE), Mockito.any(Canceler.class)); // Perform some edits and wait for a checkpoint to start on the SBN. doEdits(0, 1000); nn0.getRpcServer().rollEditLog(); answerer.waitForCall(); assertTrue("SBN is not performing checkpoint but it should be.", answerer.getFireCount() == 1 && answerer.getResultCount() == 0); // Make sure that the lock has actually been taken by the checkpointing // thread. ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); try { // Perform an RPC to the SBN and make sure it throws a StandbyException. nn1.getRpcServer().getFileInfo("/"); fail("Should have thrown StandbyException, but instead succeeded."); } catch (StandbyException se) { GenericTestUtils.assertExceptionContains("is not supported", se); } // Make sure that the checkpoint is still going on, implying that the client // RPC to the SBN happened during the checkpoint. assertTrue("SBN should have still been checkpointing.", answerer.getFireCount() == 1 && answerer.getResultCount() == 0); answerer.proceed(); answerer.waitForResult(); assertTrue("SBN should have finished checkpointing.", answerer.getFireCount() == 1 && answerer.getResultCount() == 1); }
void validateStorageInfo(FSImage si) throws IOException { if(layoutVersion != si.layoutVersion || namespaceID != si.namespaceID || cTime != si.cTime || checkpointTime != si.checkpointTime || !imageDigest.equals(si.imageDigest)) { // checkpointTime can change when the image is saved - do not compare throw new IOException("Inconsistent checkpoint fields.\n" + "LV = " + layoutVersion + " namespaceID = " + namespaceID + " cTime = " + cTime + "; checkpointTime = " + checkpointTime + " ; imageDigest = " + imageDigest + ".\nExpecting respectively: " + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime + "; " + si.checkpointTime + "; " + si.imageDigest); } }
/** * Load checkpoint from local files only if the memory state is empty.<br> * Set new checkpoint time received from the name-node.<br> * Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>. * @throws IOException */ void loadCheckpoint(CheckpointSignature sig) throws IOException { // load current image and journal if it is not in memory already if(!editLog.isOpen()) editLog.open(); FSDirectory fsDir = getFSNamesystem().dir; if(fsDir.isEmpty()) { Iterator<StorageDirectory> itImage = dirIterator(NameNodeDirType.IMAGE); Iterator<StorageDirectory> itEdits = dirIterator(NameNodeDirType.EDITS); if(!itImage.hasNext() || ! itEdits.hasNext()) throw new IOException("Could not locate checkpoint directories"); StorageDirectory sdName = itImage.next(); StorageDirectory sdEdits = itEdits.next(); fsDir.writeLock(); try { // load image under rootDir lock // make sure image checksum is verified against the expected value imageDigest = sig.imageDigest; loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE)); } finally { fsDir.writeUnlock(); } loadFSEdits(sdEdits); } // set storage fields setStorageInfo(sig); imageDigest = sig.imageDigest; checkpointTime = sig.checkpointTime; fsDir.setReady(); getFSNamesystem().setBlockTotal(); }
@VisibleForTesting void doTailEdits() throws IOException, InterruptedException { // Write lock needs to be interruptible here because the // transitionToActive RPC takes the write lock before calling // tailer.stop() -- so if we're not interruptible, it will // deadlock. namesystem.writeLockInterruptibly(); try { FSImage image = namesystem.getFSImage(); long lastTxnId = image.getLastAppliedTxId(); if (LOG.isDebugEnabled()) { LOG.debug("lastTxnId: " + lastTxnId); } Collection<EditLogInputStream> streams; try { streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false); } catch (IOException ioe) { // This is acceptable. If we try to tail edits in the middle of an edits // log roll, i.e. the last one has been finalized but the new inprogress // edits file hasn't been started yet. LOG.warn("Edits tailer failed to find any streams. Will try again " + "later.", ioe); return; } if (LOG.isDebugEnabled()) { LOG.debug("edit streams to load from: " + streams.size()); } // Once we have streams to load, errors encountered are legitimate cause // for concern, so we don't catch them here. Simple errors reading from // disk are ignored. long editsLoaded = 0; try { editsLoaded = image.loadEdits(streams, namesystem); } catch (EditLogInputException elie) { editsLoaded = elie.getNumEditsLoaded(); throw elie; } finally { if (editsLoaded > 0 || LOG.isDebugEnabled()) { LOG.info(String.format("Loaded %d edits starting from txid %d ", editsLoaded, lastTxnId)); } } if (editsLoaded > 0) { lastLoadTimeMs = monotonicNow(); } lastLoadedTxnId = image.getLastAppliedTxId(); } finally { namesystem.writeUnlock(); } }
private long countUncheckpointedTxns() { FSImage img = namesystem.getFSImage(); return img.getLastAppliedOrWrittenTxId() - img.getStorage().getMostRecentCheckpointTxId(); }
@Test (timeout = 300000) public void testFinalize() throws Exception { final Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = null; final Path foo = new Path("/foo"); final Path bar = new Path("/bar"); try { cluster = new MiniQJMHACluster.Builder(conf).build(); MiniDFSCluster dfsCluster = cluster.getDfsCluster(); dfsCluster.waitActive(); // let NN1 tail editlog every 1s dfsCluster.getConfiguration(1).setInt( DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); dfsCluster.restartNameNode(1); dfsCluster.transitionToActive(0); DistributedFileSystem dfs = dfsCluster.getFileSystem(0); dfs.mkdirs(foo); FSImage fsimage = dfsCluster.getNamesystem(0).getFSImage(); // start rolling upgrade RollingUpgradeInfo info = dfs .rollingUpgrade(RollingUpgradeAction.PREPARE); Assert.assertTrue(info.isStarted()); dfs.mkdirs(bar); queryForPreparation(dfs); // The NN should have a copy of the fsimage in case of rollbacks. Assert.assertTrue(fsimage.hasRollbackFSImage()); info = dfs.rollingUpgrade(RollingUpgradeAction.FINALIZE); Assert.assertTrue(info.isFinalized()); Assert.assertTrue(dfs.exists(foo)); // Once finalized, there should be no more fsimage for rollbacks. Assert.assertFalse(fsimage.hasRollbackFSImage()); // Should have no problem in restart and replaying edits that include // the FINALIZE op. dfsCluster.restartNameNode(0); } finally { if (cluster != null) { cluster.shutdown(); } } }
private void testUpgrade(UpgradeState state) throws Exception { cluster.transitionToActive(0); final Configuration confNN1 = cluster.getConfiguration(1); final File current = cluster.getNameNode(1).getFSImage().getStorage() .getStorageDir(0).getCurrentDir(); final File tmp = cluster.getNameNode(1).getFSImage().getStorage() .getStorageDir(0).getPreviousTmp(); // shut down nn1 cluster.shutdownNameNode(1); // make NN0 in upgrade state FSImage fsImage0 = cluster.getNameNode(0).getNamesystem().getFSImage(); Whitebox.setInternalState(fsImage0, "isUpgradeFinalized", false); switch (state) { case RECOVER: // rename the current directory to previous.tmp in nn1 NNStorage.rename(current, tmp); break; case FORMAT: // rename the current directory to a random name so it's not formatted final File wrongPath = new File(current.getParentFile(), "wrong"); NNStorage.rename(current, wrongPath); break; default: break; } int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1); assertEquals(0, rc); // Should have copied over the namespace from the standby FSImageTestUtil.assertNNHasCheckpoints(cluster, 1, ImmutableList.of(0)); FSImageTestUtil.assertNNFilesMatch(cluster); // make sure the NN1 is in upgrade state, i.e., the previous directory has // been successfully created cluster.restartNameNode(1); assertFalse(cluster.getNameNode(1).getNamesystem().isUpgradeFinalized()); }
@VisibleForTesting void doTailEdits() throws IOException, InterruptedException { // Write lock needs to be interruptible here because the // transitionToActive RPC takes the write lock before calling // tailer.stop() -- so if we're not interruptible, it will // deadlock. namesystem.writeLockInterruptibly(); try { FSImage image = namesystem.getFSImage(); long lastTxnId = image.getLastAppliedTxId(); if (LOG.isDebugEnabled()) { LOG.debug("lastTxnId: " + lastTxnId); } Collection<EditLogInputStream> streams; try { streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false); } catch (IOException ioe) { // This is acceptable. If we try to tail edits in the middle of an edits // log roll, i.e. the last one has been finalized but the new inprogress // edits file hasn't been started yet. LOG.warn("Edits tailer failed to find any streams. Will try again " + "later.", ioe); return; } if (LOG.isDebugEnabled()) { LOG.debug("edit streams to load from: " + streams.size()); } // Once we have streams to load, errors encountered are legitimate cause // for concern, so we don't catch them here. Simple errors reading from // disk are ignored. long editsLoaded = 0; try { editsLoaded = image.loadEdits(streams, namesystem); } catch (EditLogInputException elie) { editsLoaded = elie.getNumEditsLoaded(); throw elie; } finally { if (editsLoaded > 0 || LOG.isDebugEnabled()) { LOG.debug(String.format("Loaded %d edits starting from txid %d ", editsLoaded, lastTxnId)); } } if (editsLoaded > 0) { lastLoadTimeMs = monotonicNow(); } lastLoadedTxnId = image.getLastAppliedTxId(); } finally { namesystem.writeUnlock(); } }
private void testFinalize(int nnCount) throws Exception { final Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = null; final Path foo = new Path("/foo"); final Path bar = new Path("/bar"); try { cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build(); MiniDFSCluster dfsCluster = cluster.getDfsCluster(); dfsCluster.waitActive(); // let other NN tail editlog every 1s for(int i=1; i < nnCount; i++) { dfsCluster.getConfiguration(i).setInt( DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); } dfsCluster.restartNameNodes(); dfsCluster.transitionToActive(0); DistributedFileSystem dfs = dfsCluster.getFileSystem(0); dfs.mkdirs(foo); FSImage fsimage = dfsCluster.getNamesystem(0).getFSImage(); // start rolling upgrade RollingUpgradeInfo info = dfs .rollingUpgrade(RollingUpgradeAction.PREPARE); Assert.assertTrue(info.isStarted()); dfs.mkdirs(bar); queryForPreparation(dfs); // The NN should have a copy of the fsimage in case of rollbacks. Assert.assertTrue(fsimage.hasRollbackFSImage()); info = dfs.rollingUpgrade(RollingUpgradeAction.FINALIZE); Assert.assertTrue(info.isFinalized()); Assert.assertTrue(dfs.exists(foo)); // Once finalized, there should be no more fsimage for rollbacks. Assert.assertFalse(fsimage.hasRollbackFSImage()); // Should have no problem in restart and replaying edits that include // the FINALIZE op. dfsCluster.restartNameNode(0); } finally { if (cluster != null) { cluster.shutdown(); } } }