/** * Process the INode records stored in the fsimage. * * @param in Datastream to process * @param v Visitor to walk over INodes * @param numInodes Number of INodes stored in file * @param skipBlocks Process all the blocks within the INode? * @param supportSnapshot Whether or not the imageVersion supports snapshot * @throws VisitException * @throws IOException */ private void processINodes(DataInputStream in, ImageVisitor v, long numInodes, boolean skipBlocks, boolean supportSnapshot) throws IOException { v.visitEnclosingElement(ImageElement.INODES, ImageElement.NUM_INODES, numInodes); if (NameNodeLayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION, imageVersion)) { if (!supportSnapshot) { processLocalNameINodes(in, v, numInodes, skipBlocks); } else { processLocalNameINodesWithSnapshot(in, v, skipBlocks); } } else { // full path name processFullNameINodes(in, v, numInodes, skipBlocks); } v.leaveEnclosingElement(); // INodes }
private void processFileDiff(DataInputStream in, ImageVisitor v, String currentINodeName) throws IOException { int snapshotId = in.readInt(); v.visitEnclosingElement(ImageElement.SNAPSHOT_FILE_DIFF, ImageElement.SNAPSHOT_DIFF_SNAPSHOTID, snapshotId); v.visit(ImageElement.SNAPSHOT_FILE_SIZE, in.readLong()); if (in.readBoolean()) { v.visitEnclosingElement(ImageElement.SNAPSHOT_INODE_FILE_ATTRIBUTES); if (NameNodeLayoutVersion.supports(Feature.OPTIMIZE_SNAPSHOT_INODES, imageVersion)) { processINodeFileAttributes(in, v, currentINodeName); } else { processINode(in, v, true, currentINodeName, true); } v.leaveEnclosingElement(); } v.leaveEnclosingElement(); }
/** * Ensure that during downgrade the NN fails to load a fsimage with newer * format. */ @Test(expected = IncorrectVersionException.class) public void testRejectNewFsImage() throws IOException { final Configuration conf = new Configuration(); MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); fs.saveNamespace(); fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); NNStorage storage = spy(cluster.getNameNode().getFSImage().getStorage()); int futureVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1; doReturn(futureVersion).when(storage).getServiceLayoutVersion(); storage.writeAll(); cluster.restartNameNode(0, true, "-rollingUpgrade", "downgrade"); } finally { if (cluster != null) { cluster.shutdown(); } } }
private static void createEmptyInProgressEditLog(MiniDFSCluster cluster, NameNode nn, boolean writeHeader) throws IOException { long txid = nn.getNamesystem().getEditLog().getLastWrittenTxId(); URI sharedEditsUri = cluster.getSharedEditsDir(0, 1); File sharedEditsDir = new File(sharedEditsUri.getPath()); StorageDirectory storageDir = new StorageDirectory(sharedEditsDir); File inProgressFile = NameNodeAdapter.getInProgressEditsFile(storageDir, txid + 1); assertTrue("Failed to create in-progress edits file", inProgressFile.createNewFile()); if (writeHeader) { DataOutputStream out = new DataOutputStream(new FileOutputStream( inProgressFile)); EditLogFileOutputStream.writeHeader( NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION, out); out.close(); } }
private long writeSegmentUntilCrash(MiniJournalCluster cluster, QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) { long firstTxId = txid; long lastAcked = txid - 1; try { EditLogOutputStream stm = qjm.startLogSegment(txid, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (int i = 0; i < numTxns; i++) { QJMTestUtil.writeTxns(stm, txid++, 1); lastAcked++; } stm.close(); qjm.finalizeLogSegment(firstTxId, lastAcked); } catch (Throwable t) { thrown.held = t; } return lastAcked; }
/** * Test the case where the NN crashes after starting a new segment * on all nodes, but before writing the first transaction to it. */ @Test public void testCrashAtBeginningOfSegment() throws Exception { writeSegment(cluster, qjm, 1, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); EditLogOutputStream stm = qjm.startLogSegment(4, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); try { waitForAllPendingCalls(qjm.getLoggerSetForTests()); } finally { stm.abort(); } // Make a new QJM qjm = closeLater(new QuorumJournalManager( conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO)); qjm.recoverUnfinalizedSegments(); checkRecovery(cluster, 1, 3); writeSegment(cluster, qjm, 4, 3, true); }
/** * Set up the loggers into the following state: * - JN0: edits 1-3 in progress * - JN1: edits 1-4 in progress * - JN2: edits 1-5 in progress * * None of the loggers have any associated paxos info. */ private void setupLoggers345() throws Exception { EditLogOutputStream stm = qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); failLoggerAtTxn(spies.get(0), 4); failLoggerAtTxn(spies.get(1), 5); writeTxns(stm, 1, 3); // This should succeed to 2/3 loggers writeTxns(stm, 4, 1); // This should only succeed to 1 logger (index 2). Hence it should // fail try { writeTxns(stm, 5, 1); fail("Did not fail to write when only a minority succeeded"); } catch (QuorumException qe) { GenericTestUtils.assertExceptionContains( "too many exceptions to achieve quorum size 2/3", qe); } }
@Test public void testQuorumOfLoggersFail() throws Exception { futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(), Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION)); futureThrows(new IOException("logger failed")) .when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong(), Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION)); futureThrows(new IOException("logger failed")) .when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong(), Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION)); try { qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); fail("Did not throw when quorum failed"); } catch (QuorumException qe) { GenericTestUtils.assertExceptionContains("logger failed", qe); } }
private void doPerfTest(int editsSize, int numEdits) throws Exception { byte[] data = new byte[editsSize]; ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get(); StopWatch sw = new StopWatch().start(); for (int i = 1; i < numEdits; i++) { ch.sendEdits(1L, i, 1, data).get(); } long time = sw.now(TimeUnit.MILLISECONDS); System.err.println("Wrote " + numEdits + " batches of " + editsSize + " bytes in " + time + "ms"); float avgRtt = (float)time/(float)numEdits; long throughput = ((long)numEdits * editsSize * 1000L)/time; System.err.println("Time per batch: " + avgRtt + "ms"); System.err.println("Throughput: " + throughput + " bytes/sec"); }
/** * Test whether JNs can correctly handle editlog that cannot be decoded. */ @Test public void testScanEditLog() throws Exception { // use a future layout version journal.startLogSegment(makeRI(1), 1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1); // in the segment we write garbage editlog, which can be scanned but // cannot be decoded final int numTxns = 5; byte[] ops = QJMTestUtil.createGabageTxns(1, 5); journal.journal(makeRI(2), 1, 1, numTxns, ops); // verify the in-progress editlog segment SegmentStateProto segmentState = journal.getSegmentInfo(1); assertTrue(segmentState.getIsInProgress()); Assert.assertEquals(numTxns, segmentState.getEndTxId()); Assert.assertEquals(1, segmentState.getStartTxId()); // finalize the segment and verify it again journal.finalizeLogSegment(makeRI(3), 1, numTxns); segmentState = journal.getSegmentInfo(1); assertFalse(segmentState.getIsInProgress()); Assert.assertEquals(numTxns, segmentState.getEndTxId()); Assert.assertEquals(1, segmentState.getStartTxId()); }
@Test (timeout = 10000) public void testFormatResetsCachedValues() throws Exception { journal.newEpoch(FAKE_NSINFO, 12345L); journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); assertEquals(12345L, journal.getLastPromisedEpoch()); assertEquals(12345L, journal.getLastWriterEpoch()); assertTrue(journal.isFormatted()); // Close the journal in preparation for reformatting it. journal.close(); journal.format(FAKE_NSINFO_2); assertEquals(0, journal.getLastPromisedEpoch()); assertEquals(0, journal.getLastWriterEpoch()); assertTrue(journal.isFormatted()); }
public static EditLogOutputStream writeSegment(MiniJournalCluster cluster, QuorumJournalManager qjm, long startTxId, int numTxns, boolean finalize) throws IOException { EditLogOutputStream stm = qjm.startLogSegment(startTxId, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); // Should create in-progress assertExistsInQuorum(cluster, NNStorage.getInProgressEditsFileName(startTxId)); writeTxns(stm, startTxId, numTxns); if (finalize) { stm.close(); qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1); return null; } else { return stm; } }
@Test public void testSimpleWrite() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); String zkpath = bkjm.finalizedLedgerZNode(1, 100); assertNotNull(zkc.exists(zkpath, false)); assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); }
@Test public void testNumberOfTransactions() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(100, numTrans); }
@Test public void testTwoWriters() throws Exception { long start = 1; NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); bkjm1.format(nsi); BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); EditLogOutputStream out1 = bkjm1.startLogSegment(start, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); try { bkjm2.startLogSegment(start, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); fail("Shouldn't have been able to open the second writer"); } catch (IOException ioe) { LOG.info("Caught exception as expected", ioe); }finally{ out1.close(); } }
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm, int startTxid, int endTxid) throws IOException, KeeperException, InterruptedException { EditLogOutputStream out = bkjm.startLogSegment(startTxid, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long i = startTxid; i <= endTxid; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); // finalize the inprogress_1 log segment. bkjm.finalizeLogSegment(startTxid, endTxid); String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid); assertNotNull(zkc.exists(zkpath1, false)); assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false)); return zkpath1; }
/** * Ensure that restart namenode with downgrade option should throw exception * because it has been obsolete. */ @Test(expected = IllegalArgumentException.class) public void testRejectNewFsImage() throws IOException { final Configuration conf = new Configuration(); MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); fs.saveNamespace(); fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); NNStorage storage = spy(cluster.getNameNode().getFSImage().getStorage()); int futureVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1; doReturn(futureVersion).when(storage).getServiceLayoutVersion(); storage.writeAll(); cluster.restartNameNode(0, true, "-rollingUpgrade", "downgrade"); } finally { if (cluster != null) { cluster.shutdown(); } } }