private long writeSegmentUntilCrash(MiniJournalCluster cluster, QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) { long firstTxId = txid; long lastAcked = txid - 1; try { EditLogOutputStream stm = qjm.startLogSegment(txid, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (int i = 0; i < numTxns; i++) { QJMTestUtil.writeTxns(stm, txid++, 1); lastAcked++; } stm.close(); qjm.finalizeLogSegment(firstTxId, lastAcked); } catch (Throwable t) { thrown.held = t; } return lastAcked; }
@Before public void setup() throws Exception { conf = new Configuration(); // Don't retry connections - it just slows down the tests. conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); cluster = new MiniJournalCluster.Builder(conf) .build(); qjm = createSpyingQJM(); spies = qjm.getLoggerSetForTests().getLoggersForTests(); qjm.format(QJMTestUtil.FAKE_NSINFO); qjm.recoverUnfinalizedSegments(); assertEquals(1, qjm.getLoggerSetForTests().getEpoch()); }
private void doOutOfSyncTest(int missingOnRecoveryIdx, long expectedRecoveryTxnId) throws Exception { setupLoggers345(); QJMTestUtil.assertExistsInQuorum(cluster, NNStorage.getInProgressEditsFileName(1)); // Shut down the specified JN, so it's not present during recovery. cluster.getJournalNode(missingOnRecoveryIdx).stopAndJoin(0); // Make a new QJM qjm = createSpyingQJM(); qjm.recoverUnfinalizedSegments(); checkRecovery(cluster, 1, expectedRecoveryTxnId); }
/** * Test whether JNs can correctly handle editlog that cannot be decoded. */ @Test public void testScanEditLog() throws Exception { // use a future layout version journal.startLogSegment(makeRI(1), 1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1); // in the segment we write garbage editlog, which can be scanned but // cannot be decoded final int numTxns = 5; byte[] ops = QJMTestUtil.createGabageTxns(1, 5); journal.journal(makeRI(2), 1, 1, numTxns, ops); // verify the in-progress editlog segment SegmentStateProto segmentState = journal.getSegmentInfo(1); assertTrue(segmentState.getIsInProgress()); Assert.assertEquals(numTxns, segmentState.getEndTxId()); Assert.assertEquals(1, segmentState.getStartTxId()); // finalize the segment and verify it again journal.finalizeLogSegment(makeRI(3), 1, numTxns); segmentState = journal.getSegmentInfo(1); assertFalse(segmentState.getIsInProgress()); Assert.assertEquals(numTxns, segmentState.getEndTxId()); Assert.assertEquals(1, segmentState.getStartTxId()); }
@Before public void setup() throws Exception { conf = new Configuration(); // Don't retry connections - it just slows down the tests. conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); cluster = new MiniJournalCluster.Builder(conf) .build(); cluster.waitActive(); qjm = createSpyingQJM(); spies = qjm.getLoggerSetForTests().getLoggersForTests(); qjm.format(QJMTestUtil.FAKE_NSINFO); qjm.recoverUnfinalizedSegments(); assertEquals(1, qjm.getLoggerSetForTests().getEpoch()); }
private long writeSegmentUntilCrash(MiniJournalCluster cluster, QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) { long firstTxId = txid; long lastAcked = txid - 1; try { EditLogOutputStream stm = qjm.startLogSegment(txid); for (int i = 0; i < numTxns; i++) { QJMTestUtil.writeTxns(stm, txid++, 1); lastAcked++; } stm.close(); qjm.finalizeLogSegment(firstTxId, lastAcked); } catch (Throwable t) { thrown.held = t; } return lastAcked; }
@Before public void setup() throws Exception { conf = new Configuration(); // Don't retry connections - it just slows down the tests. conf.setInt("ipc.client.connect.max.retries", 0); conf.setLong(JournalConfigKeys.DFS_QJOURNAL_CONNECT_TIMEOUT_KEY, 100); cluster = new MiniJournalCluster.Builder(conf) .build(); qjm = createSpyingQJM(); spies = qjm.getLoggerSetForTests().getLoggersForTests(); qjm.transitionJournal(QJMTestUtil.FAKE_NSINFO, Transition.FORMAT, null); qjm.recoverUnfinalizedSegments(); assertEquals(1, qjm.getLoggerSetForTests().getEpoch()); }
@Before public void setup() throws Exception { conf = new Configuration(); // Don't retry connections - it just slows down the tests. conf.setInt("ipc.client.connect.max.retries", 0); conf.setLong(JournalConfigKeys.DFS_QJOURNAL_CONNECT_TIMEOUT_KEY, 100); cluster = new MiniJournalCluster.Builder(conf) .build(); qjm = TestQuorumJournalManager.createSpyingQJM(conf, cluster, JID, FAKE_NSINFO); qjm.transitionJournal(QJMTestUtil.FAKE_NSINFO, Transition.FORMAT, null); qjm.recoverUnfinalizedSegments(); assertEquals(1, qjm.getLoggerSetForTests().getEpoch()); }
/** * Ensure that refresh functionality does not work for finalized streams (at * startup) */ @Test public void testRefreshOnlyForInprogress() throws Exception { // start new segment EditLogOutputStream stm = qjm.startLogSegment(0); // write a bunch of transactions QJMTestUtil.writeTxns(stm, 0, 10); qjm.finalizeLogSegment(0, 9); // get input stream List<EditLogInputStream> streams = Lists.newArrayList(); // get only finalized streams qjm.selectInputStreams(streams, 0, false, false); try { // try refreshing the stream (this is startup mode // inprogress segments not allowed -> refresh should fail streams.get(0).refresh(10, 0); fail("The shream should not allow refreshing"); } catch (IOException e) { LOG.info("Expected exception: ", e); } }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); int port = MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); httpAddress = "http://localhost:" + port; jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); journal.transitionImage(FAKE_NSINFO, Transition.FORMAT, null); }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); }
/** * Assume that a client is writing to a journal, but loses its connection in * the middle of a segment. Thus, any future journal() calls in that segment * may fail, because some txns were missed while the connection was down. * * Eventually, the connection comes back, and the NN tries to start a new * segment at a higher txid. This should abort the old one and succeed. */ @Test public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); // Start a segment at txid 1, and write a batch of 3 txns. journal.startLogSegment(makeRI(1), 1); journal.journal(makeRI(2), 1, 1, 3, QJMTestUtil.createTxnData(1, 3)); GenericTestUtils.assertExists(journal.getJournalStorage().getInProgressEditLog(1)); // Try to start new segment at txid 6, this should abort old segment and // then succeed, allowing us to write txid 6-9. journal.startLogSegment(makeRI(3), 6); journal.journal(makeRI(4), 6, 6, 3, QJMTestUtil.createTxnData(6, 3)); // The old segment should *not* be finalized. GenericTestUtils.assertExists(journal.getJournalStorage().getInProgressEditLog(1)); GenericTestUtils.assertExists(journal.getJournalStorage().getInProgressEditLog(6)); }
/** * Assume that a client is writing to a journal, but loses its connection * in the middle of a segment. Thus, any future journal() calls in that * segment may fail, because some txns were missed while the connection was * down. * * Eventually, the connection comes back, and the NN tries to start a new * segment at a higher txid. This should abort the old one and succeed. */ @Test (timeout = 10000) public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); // Start a segment at txid 1, and write a batch of 3 txns. journal.startLogSegment(makeRI(1), 1); journal.journal(makeRI(2), 1, 1, 3, QJMTestUtil.createTxnData(1, 3)); GenericTestUtils.assertExists( journal.getStorage().getInProgressEditLog(1)); // Try to start new segment at txid 6, this should abort old segment and // then succeed, allowing us to write txid 6-9. journal.startLogSegment(makeRI(3), 6); journal.journal(makeRI(4), 6, 6, 3, QJMTestUtil.createTxnData(6, 3)); // The old segment should *not* be finalized. GenericTestUtils.assertExists( journal.getStorage().getInProgressEditLog(1)); GenericTestUtils.assertExists( journal.getStorage().getInProgressEditLog(6)); }